Skip to content

Commit

Permalink
Add streaming documentation + encoding is given at init.
Browse files Browse the repository at this point in the history
  • Loading branch information
atrabattoni committed Sep 17, 2024
1 parent a08b46c commit c7b3102
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/api/processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@
DataArrayLoader
RealTimeLoader
DataArrayWriter
ZMQPublisher
ZMQSubscriber
```
1 change: 1 addition & 0 deletions docs/user-guide/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ interpolated-coordinates
convert-displacement
atoms
processing
streaming
```
86 changes: 86 additions & 0 deletions docs/user-guide/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
---
file_format: mystnb
kernelspec:
name: python3
---

# Streaming data

Xdas allows to stream data over any network using [ZeroMQ](https://zeromq.org). Xdas use the Publisher and Subscriber patterns meaning that on one node the data is published and that any number of subscribers can receive the data stream.

Streaming data with Xdas is done by simply dumping each chunk to NetCDF binaries and to send those as packets. This ensure that each packet is self described and that feature such as compression are available (which can be very helpful to minimize the used bandwidth).

Xdas implements the {py:class}`~xdas.processing.ZMQPublisher` and {py:class}`~xdas.processing.ZMQSubscriber`.Those object can respectively be used as a Writer and a Loader as described in the [](processing) section. Both are initialized by giving an network address. The publisher use the `submit` method to send packets while the subscriber is an infinite iterator that yields packets.

In this section, we will mimic the use of several machine by using multithreading, where each thread is supposed to be a different machine. In real-life application, the publisher and subscriber are generally called in different machine or software.

## Simple use case

```{code-cell}
import threading
import time
import xdas as xd
from xdas.processing import ZMQPublisher, ZMQSubscriber
```

First we generate some data and split it into packets

```{code-cell}
da = xd.synthetics.dummy()
packets = xd.split(da, 5)
```

We then publish the packets on machine 1.

```{code-cell}
address = f"tcp://localhost:{xd.io.get_free_port()}"
publisher = ZMQPublisher(address)
def publish():
for packet in packets:
publisher.submit(packet)
# give a chance to the subscriber to connect in time and to get the last packet
time.sleep(0.1)
machine1 = threading.Thread(target=publish)
machine1.start()
```

Let's receive the packets on machine 2.

```{code-cell}
subscriber = ZMQSubscriber(address)
packets = []
def subscribe():
for packet in subscriber:
packets.append(packet)
machine2 = threading.Thread(target=subscribe)
machine2.start()
```

Now we wait for machine 1 to finish sending its packet and see if everything went well.

```{code-cell}
machine1.join()
print(f"We received {len(packets)} packets!")
assert xd.concatenate(packets).equals(da)
```

## Using encoding

To reduce the volume of the transmitted data, compression is often useful. Xdas enable the use of the ZFP algorithm when storing data but also when streaming it. Encoding is declared the same way.

```{code-cell}
:tags: [remove-output]
import hdf5plugin
encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)}
publisher = ZMQPublisher(address, encoding) # Add encoding here, the rest is the same
```


4 changes: 2 additions & 2 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ def test_write_and_result_with_existing_file(self):

class TestZMQ:
def _publish_and_subscribe(self, packets, address, encoding=None):
publisher = ZMQPublisher(address)
publisher = ZMQPublisher(address, encoding)

def publish():
for packet in packets:
time.sleep(0.001)
publisher.submit(packet, encoding=encoding)
publisher.submit(packet)

threading.Thread(target=publish).start()

Expand Down
43 changes: 25 additions & 18 deletions xdas/processing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class DataArrayWriter:
dirpath : str or path
The directory to store the output of a processing pipeline. The directory needs
to exist and be empty.
encoding : dict
The encoding to use when dumping the DataArrays to bytes.
Examples
--------
Expand Down Expand Up @@ -309,11 +311,13 @@ class ZMQPublisher:
----------
address : str
The address to bind the publisher to.
encoding : dict
The encoding to use when dumping the DataArrays to bytes.
Examples
--------
>>> import xdas as xd
>>> from xdas.processing import ZMQPublisher
>>> from xdas.processing import ZMQPublisher, ZMQSubscriber
First we generate some data and split it into packets
Expand All @@ -333,20 +337,22 @@ class ZMQPublisher:
>>> import hdf5plugin
>>> address = f"tcp://localhost:{xd.io.get_free_port()}"
>>> encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)}
>>> publisher = ZMQPublisher(address, encoding)
>>> for da in packets:
... publisher.submit(da, encoding=encoding)
... publisher.submit(da)
"""

def __init__(self, address):
import zmq

self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(address)
def __init__(self, address, encoding=None):
self.address = address
self.encoding = encoding
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB)
self._socket.bind(self.address)

def submit(self, da, encoding=None):
def submit(self, da):
"""
Send a DataArray over ZeroMQ.
Expand All @@ -356,10 +362,10 @@ def submit(self, da, encoding=None):
The DataArray to be sent.
"""
self.socket.send(tobytes(da, encoding))
self._socket.send(tobytes(da, self.encoding))

def write(self, da, encoding=None):
self.submit(da, encoding)
def write(self, da):
self.submit(da)

def result():
return None
Expand All @@ -376,7 +382,7 @@ class ZMQSubscriber:
Methods
-------
submit(da, encoding=None)
submit(da)
Send a DataArray over ZeroMQ.
Examples
Expand Down Expand Up @@ -415,16 +421,17 @@ class ZMQSubscriber:
"""

def __init__(self, address):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect(address)
self.socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.address = address
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.connect(address)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")

def __iter__(self):
return self

def __next__(self):
message = self.socket.recv()
message = self._socket.recv()
return frombuffer(message)


Expand Down

0 comments on commit c7b3102

Please sign in to comment.