diff --git a/docs/conf.py b/docs/conf.py index 13910fa..2e8eae0 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,7 +11,7 @@ author = "Alister Trabattoni" # The full version, including alpha/beta/rc tags -release = "0.1.2" +release = "0.2" # -- General configuration --------------------------------------------------- diff --git a/docs/release-notes.md b/docs/release-notes.md index 088eb79..cdfc277 100644 --- a/docs/release-notes.md +++ b/docs/release-notes.md @@ -1,10 +1,16 @@ # Release notes +## 0.2 + +- Add Dask virtualization backend for non-HDF5 formats (@atrabattoni). +- Add support for miniSEED format (@atrabattoni, @chauvetige). +- Add support for Silixa (TDMS) format (@atrabattoni, @Stutzmann). + ## 0.1.2 -- Add ZeroMQ streaming capabilities (@atrabattoni) +- Add ZeroMQ streaming capabilities (@atrabattoni). - Add support of Terra15 format (@chauvetige). -- Fix Febus engine (@ClaudioStrumia) +- Fix Febus engine (@ClaudioStrumia). ## 0.1.1 @@ -14,4 +20,4 @@ ## 0.1 -Initial stable version \ No newline at end of file +Initial stable version. \ No newline at end of file diff --git a/docs/user-guide/data-formats.md b/docs/user-guide/data-formats.md index 4a99bf7..441456e 100644 --- a/docs/user-guide/data-formats.md +++ b/docs/user-guide/data-formats.md @@ -20,15 +20,25 @@ os.chdir("../_data") ## Implemented file formats -The formats that are currently implemented are: ASN, FEBUS, OPTASENSE, SINTELA and TERRA15. To read them you have to specifiy which one you want in the `engine` argument in {py:func}`xdas.open_dataarray` for a single file or {py:func}`xdas.open_mfdataarray` for multiple files: - -| DAS constructor | `engine` argument | -|:-----------------:|:-----------------:| -| ASN | `"asn"` | -| FEBUS | `"febus"` | -| OPTASENSE | `"optasense"` | -| SINTELA | `"sintela"` | -| TERRA15 | `"terra15"` | +Here below the list of formats that are currently implemented. All HDF5 based formats support native virtualization. Other formats support Dask virtualization. Please refer to the [](virtual-datasets) section. To read a them you have to specify which one you want in the `engine` argument in {py:func}`xdas.open_dataarray` for a single file or {py:func}`xdas.open_mfdataarray` for multiple files. + +Xdas support the following DAS formats: + +| Constructor | Instrument | `engine` argument | Virtualization | +|:-----------------:|:-----------------:|:-----------------:|:-----------------:| +| ASN | OptoDAS | `"asn"` | HDF5 | +| FEBUS | A1 | `"febus"` | HDF5 | +| OptaSense | OLA, ODH*, ... | `"optasense"` | HDF5 | +| Silixa | iDAS | `"silixa"` | Dask | +| SINTELA | ONYX | `"sintela"` | HDF5 | +| Terra15 | Treble | `"terra15"` | HDF5 | + +It also implements its own format and support miniSEED: + +| Format | `engine` argument | Virtualization | +|:-----------------:|:-----------------:|:-----------------:| +| Xdas | `None` | HDF5 | +| miniSEED | `"miniseed"` | Dask | ```{warning} Due to poor documentation of the various version of the Febus format, it is recommended to manually provide the required trimming and the position of the timestamps within each block. For example to trim 100 samples on both side of each block and to set the timestamp location at the center of the block for a block of 2000 samples: diff --git a/docs/user-guide/faq.md b/docs/user-guide/faq.md new file mode 100644 index 0000000..b8173c4 --- /dev/null +++ b/docs/user-guide/faq.md @@ -0,0 +1,10 @@ +# Frequently Asked Questions + +## Why not using Xarray and Dask? + +Originally, Xdas was meant to be a simple add-on to Xarray, taking advantage of its [Dask integration] (https://docs.xarray.dev/en/stable/user-guide/dask.html). But two main limitations forced us to create a parallel project: + +- Coordinates have to be loaded into memory as NumPy arrays. This is prohibitive for very long time series, where storing the time coordinate as a dense array with a value for each time sample leads to metadata that in some extreme cases cannot fit in memory. +- Dask arrays become sluggish when dealing with a very large number of files. Dask is a pure Python package, and processing graphs of millions of tasks can take several seconds or more. Also, Dask does not provide a way to serialise a graph for later reuse. + +Because of this, and the fact that the Xarray object was not designed to be subclassed, we decided to go our own way. Hopefully, if the progress of Xarray allows it, we could imagine merging the two projects. Xdas tries to follow the Xarray API as much as possible. diff --git a/docs/user-guide/index.md b/docs/user-guide/index.md index c8aef43..4ab4214 100644 --- a/docs/user-guide/index.md +++ b/docs/user-guide/index.md @@ -7,8 +7,10 @@ data-structure/index data-formats virtual-datasets interpolated-coordinates +miniseed convert-displacement atoms processing streaming +faq ``` \ No newline at end of file diff --git a/docs/user-guide/miniseed.md b/docs/user-guide/miniseed.md new file mode 100644 index 0000000..d44a010 --- /dev/null +++ b/docs/user-guide/miniseed.md @@ -0,0 +1,131 @@ +--- +file_format: mystnb +kernelspec: + name: python3 +--- + +```{code-cell} +:tags: [remove-cell] + +import os +os.chdir("../_data") + +import warnings +warnings.filterwarnings("ignore") + +import obspy +import numpy as np + +np.random.seed(0) + +network = "NX" +stations = ["SX001", "SX002", "SX003", "SX004", "SX005", "SX006", "SX007"] +location = "00" +channels = ["HHZ", "HHN", "HHE"] + +nchunk = 5 +chunk_duration = 60 +starttimes = [ + obspy.UTCDateTime("2024-01-01T00:00:00") + idx * chunk_duration + for idx in range(nchunk) +] +delta = 0.01 +failure = 0.1 + +for station in stations: + for starttime in starttimes: + if np.random.rand() < failure: + continue + for channel in channels: + data = np.random.randn(round(chunk_duration / delta)) + header = { + "delta": delta, + "starttime": starttime, + "network": network, + "station": station, + "location": location, + "channel": channel, + } + tr = obspy.Trace(data, header) + endtime = starttime + chunk_duration + dirpath = f"{network}/{station}" + if not os.path.exists(dirpath): + os.makedirs(dirpath) + fname = f"{network}.{station}.{location}.{channel}__{starttime}_{endtime}.mseed" + path = os.path.join(dirpath, fname) + tr.write(path) + +``` + +# Working with Large-N Seismic Arrays + +The virtualization capabilities of Xdas make it a good candidate for working with the large datasets produced by large-N seismic arrays. + +In this section, we will present several examples of how to handle long and numerous time series. + +```{note} +This part encourages experimenting with seismic data. Depending on the most common use cases users find, this could lead to changes in development direction. +``` + +## Exploring a dataset + +We will start by exploring a synthetic dataset composed of 7 stations, each with 3 channels (HHZ, HHN, HHE). Each trace for each station and channel is stored in multiple one-minute-long files. Some files are missing, resulting in data gaps. The sampling rate is 100 Hz. The data is organized in a directory structure that groups files by station. + +To open the dataset, we will use the `open_mfdatatree` function. This function takes a pattern that describes the directory structure and file names. The pattern is a string containing placeholders for the network, station, location, channel, start time, and end time. Placeholders for named fields are enclosed in curly braces, while simple brackets are used for varying parts of the file name that will be concatenated into different acquisitions (meant for changes in acquisition parameters). + +Next, we will plot the availability of the dataset. + +```{code-cell} +:tags: [remove-output] + +import xdas as xd + +pattern = "NX/{station}/NX.{station}.00.{channel}__[acquisition].mseed" +dc = xd.open_mfdatatree(pattern, engine="miniseed") +xd.plot_availability(dc, dim="time") +``` +```{code-cell} +:tags: [remove-input] +from IPython.display import HTML +fig = xd.plot_availability(dc, dim="time") +HTML(fig.to_html()) +``` + +We can see that indeed some data is missing. Yet, as often, the different channels are synchronized. We can therefore reorganize the data by concatenating the channels of each station. + +```{code-cell} +:tags: [remove-output] + +dc = xd.DataCollection( + { + station: xd.concatenate(objs, dim="channel") + for station in dc + for objs in zip(*[dc[station][channel] for channel in dc[station]]) + }, + name="station", +) +xd.plot_availability(dc, dim="time") +``` +```{code-cell} +:tags: [remove-input] +from IPython.display import HTML +fig = xd.plot_availability(dc, dim="time") +HTML(fig.to_html()) +``` + +In our case, all stations are synchronized to GPS time. By selecting a time range where no data is missing, we can concatenate the stations to obtain an N-dimensional array representation of the dataset. + +```{code-cell} +dc = dc.sel(time=slice("2024-01-01T00:01:00", "2024-01-01T00:02:59.99")) +da = xd.concatenate((dc[station] for station in dc), dim="station") +da +``` + +This is useful for performing array analysis. In this example, we simply stack the energy. + +```{code-cell} +trace = np.square(da).mean("channel").mean("station") +trace.plot(ylim=(0, 3)) +``` + +All the processing capabilities of Xdas can be applied to the dataset. We encourage readers to explore the various possibilities. diff --git a/docs/user-guide/virtual-datasets.md b/docs/user-guide/virtual-datasets.md index 48c4dec..752b14a 100644 --- a/docs/user-guide/virtual-datasets.md +++ b/docs/user-guide/virtual-datasets.md @@ -14,33 +14,43 @@ os.chdir("../_data") # Virtual Datasets -To deal with large multi-file dataset, *xdas* uses the flexibility offered by the -[virtual dataset](https://docs.h5py.org/en/stable/vds.html) capabilities of -netCDF4/HDF5. A virtual dataset is a file that contains pointers towards an arbitrary number of files that -can then be accessed seamlessly as a single, contiguous dataset. Since this is -handled by HDF5 under the hood (which is C compiled) it comes with almost no overhead. +To deal with large multi-file dataset, *Xdas* uses the concept of virtual datasets. A virtual dataset is a file that contains pointers towards an arbitrary number of files that can then be accessed seamlessly as a single, contiguous dataset. + +*Xdas* uses two types of virtualization: + +- For HDF5 based format, it leverages the performance offered by the [virtual datasets](https://docs.h5py.org/en/stable/vds.html) native capabilities of netCDF4/HDF5 which comes with almost no overhead (C compiled). +- For other type of files, it leverage the flexibility of [Dask arrays](https://docs.Dask.org/en/stable/array.html). + +## HDF5 Virtualization ```{note} -Because netCDF4 are valid HDF5 files, the virtual dataset feature of HDF5 can be used -with netCDF4 files. +Because netCDF4 are valid HDF5 files, the virtual dataset feature of HDF5 can be used with netCDF4 files. ``` -In *xdas*, a {py:class}`VirtualSource` is a pointer towards a file, while a -{py:class}`VirtualLayout` is table linking multiple {py:class}`VirtualSource`s. Below is an -example of a virtual dataset linking three files: +In *xdas*, a {py:class}`VirtualSource` is a pointer towards a file, while a {py:class}`VirtualLayout` is table linking multiple {py:class}`VirtualSource`s. Below is an example of a virtual dataset linking three files: ![](/_static/virtual-datasets.svg) -In most cases, users do not need to deal with this object directly. To handle individual files, multiple files, and virtual datasets, *xdas* offers the following routines: - -- {py:func}`xdas.open_dataarray` is used to open a single (virtual) data file, and create a {py:class}`xdas.DataArray` object out of it. -- {py:func}`xdas.open_mfdataarray` is used to open multiple (virtual) data files at once, creating a single {py:class}`xdas.DataArray` object that can be written to disk as a single virtual data file. +In most cases, users do not need to deal with this object directly. ```{note} -When opening a virtual dataset, this later will appear as a {py:class}`VirtualSource`. -This is because HDF5 treats virtual dataset as regular files. +When opening a virtual dataset, this later will appear as a {py:class}`VirtualSource`. This is because HDF5 treats virtual dataset as regular files. ``` +## Use cases + +To handle individual files, multiple files, and virtual datasets, *xdas* offers the following routines: + +| Function | Output | Description | +|--------------------------------------|----------------------------------|-----------------------------------------------------------------------------| +| {py:func}`xdas.open_dataarray` | {py:class}`~xdas.DataArray` | Open a (virtual) file. | +| {py:func}`xdas.open_mfdataarray` | {py:class}`~xdas.DataArray` | Open multiple (virtual) files and concatenate them. | +| {py:func}`xdas.open_mfdatacollection`| {py:class}`~xdas.DataCollection` | Open multiple (virtual) files, grouping and concatenating compatible files. | +| {py:func}`xdas.open_mfdatatree` | {py:class}`~xdas.DataCollection` | Open a directory tree of files, organizing data in a data collection. | +| {py:func}`xdas.open_datacollection` | {py:class}`~xdas.DataCollection` | Open a (virtual) collection. | + +Please refer to the [](data-structure/datacollection.md) section for the functions that return a data collection. + ## Linking multi-file datasets Multiple physical data files can be opened simultaneously with the {py:func}`xdas.open_mfdataarray`: @@ -66,16 +76,17 @@ xd.open_dataarray("vds.nc") ``` ```{hint} -A virtual dataset can point to another virtual dataset. This can be beneficial for huge -real time dataset where new data can be linked regularly by batches. Those batches can -then be linked in a master virtual dataset. This avoids relinking all the files. +A virtual dataset can point to another virtual dataset. This can be beneficial for huge real time dataset where new data can be linked regularly by batches. Those batches can then be linked in a master virtual dataset. This avoids relinking all the files. ``` ```{warning} -When loading large part of a virtual dataset, you might end up with nan values. This -normally happens when linked files are missing. But due to a -[known limitation](https://forum.hdfgroup.org/t/virtual-datasets-and-open-file-limit/6757) -of the HDF5 C library it can be due to the opening of too many files. Try increasing -the number of possible file to open with the `ulimit` command. Or load smaller chunk of -data. -``` \ No newline at end of file +When loading large part of a virtual dataset, you might end up with nan values. This normally happens when linked files are missing. But due to a [known limitation](https://forum.hdfgroup.org/t/virtual-datasets-and-open-file-limit/6757) of the HDF5 C library it can be due to the opening of too many files. Try increasing the number of possible file to open with the `ulimit` command. Or load smaller chunk of data. +``` + +## Dask Virtualization + +Other type of formats will be loaded as Dask arrays. Those latter are a N-dimensional stack of chunks. At each chunk is associated a task to complete to get the values of that chunk. It results in a computation graph that Xdas is capable to serialize and store within its native NetCDF format. To be able to serialize the graph, it must only contain xdas or Dask functions. + +From an user point of view the use of this type of virtualization is very similar to HDF5 one. + +The main difference is that when opening a dataset with Dask virtualization, the entire graph of pointers to the files is loaded, can be modified and saved again. In the HDF5 case, opening a virtual dataset is handled the same way as if it is a regular file meaning that the underlying mapping of pointers is hidden and cannot be modified. Dask graph can be slow when they start to become very big (more than one million tasks). diff --git a/pyproject.toml b/pyproject.toml index e919b46..d1cf4f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,15 +4,17 @@ build-backend = "setuptools.build_meta" [project] name = "xdas" -version = "0.1.2" +version = "0.2" requires-python = ">= 3.10" authors = [ { name = "Alister Trabattoni", email = "alister.trabattoni@gmail.com" }, ] dependencies = [ + "dask", "h5netcdf", "h5py", "hdf5plugin", + "msgpack", "numba", "numpy", "obspy", diff --git a/tests/dask/test_core.py b/tests/dask/test_core.py new file mode 100644 index 0000000..34c8b4a --- /dev/null +++ b/tests/dask/test_core.py @@ -0,0 +1,86 @@ +import os +from glob import glob +from tempfile import TemporaryDirectory + +import dask +import numpy as np + +from xdas.dask import dumps, loads +from xdas.dask.core import from_dict, fuse, iskey, to_dict + + +class TestIsKey: + def test_valid(self): + keys = [("a", 0), ("a", 0, 1), "name-s0d9us-df63ij"] + for key in keys: + assert iskey(key) + + def test_invalid(self): + keys = ["", (sum, 0, 1), ("a",), ("a", "b")] + for key in keys: + assert not iskey(key) + + +class TestFuse: + def test_simple(self): + graph = { + "a": "b", + "b": (np.sum, [1, 2, 3]), + } + assert fuse(graph) == {"a": (np.sum, [1, 2, 3])} + + def test_recursive(self): + graph = { + "a": "b", + "b": "c", + "c": (np.sum, [1, 2, 3]), + } + assert fuse(graph) == {"a": (np.sum, [1, 2, 3])} + + def test_tuple(self): + graph = { + ("a", 0): ("b", 1), + ("b", 1): (np.sum, [1, 2, 3]), + } + assert fuse(graph) == {("a", 0): (np.sum, [1, 2, 3])} + + def test_ignore(self): + graph = { + "a": (sum, 1, 2), + "b": (sum, 3), + "c": (sum, "a", "b"), + } + assert fuse(graph) == graph + + +class TestIO: + def generate(self, tmpdir): + expected = np.random.rand(3, 10) + chunks = np.split(expected, 5, axis=1) + for idx, chunk in enumerate(chunks): + np.save(os.path.join(tmpdir, f"chunk_{idx}.npy"), chunk) + paths = sorted(glob(os.path.join(tmpdir, "*.npy"))) + chunks = [dask.delayed(np.load)(path) for path in paths] + chunks = [ + dask.array.from_delayed(chunk, shape=(3, 2), dtype=expected.dtype) + for chunk in chunks + ] + data = dask.array.concatenate(chunks, axis=1) + assert np.array_equal(data.compute(), expected) + return expected, data + + def test_dict(self): + with TemporaryDirectory() as tmpdir: + expected, data = self.generate(tmpdir) + result = from_dict(to_dict(data)) + assert np.array_equal(result.compute(), expected) + sliced = result[:, 0] + assert np.array_equal(sliced.compute(), expected[:, 0]) + + def test_serial(self): + with TemporaryDirectory() as tmpdir: + expected, data = self.generate(tmpdir) + result = loads(dumps(data)) + assert np.array_equal(result.compute(), expected) + sliced = result[:, 0] + assert np.array_equal(sliced.compute(), expected[:, 0]) diff --git a/tests/dask/test_serial.py b/tests/dask/test_serial.py new file mode 100644 index 0000000..e818633 --- /dev/null +++ b/tests/dask/test_serial.py @@ -0,0 +1,71 @@ +import pytest +from dask.utils import itemgetter, methodcaller + +import xdas as xd +from xdas.dask.serial import dumps, loads + + +def test_tuple(): + objs = [ + (1, 2, 3), + [1, 2, 3], + (1, (2, 3)), + [1, [2, 3]], + (1, [2, 3]), + [1, (2, 3)], + ] + for obj in objs: + assert loads(dumps(obj)) == obj + + +def test_slice(): + objs = [ + slice(1, 2, 3), + slice(None), + ] + for obj in objs: + assert loads(dumps(obj)) == obj + + +def test_callable(): + objs = [ + dumps, + loads, + xd.DataArray, + xd.open_dataarray, + ] + for obj in objs: + assert loads(dumps(obj)) is obj + + +def test_keys(): + obj = {("a", 0, 0): ("b", 0, 0)} + assert loads(dumps(obj)) == obj + + +def test_mixed_structure(): + obj = { + "a": (1, 2, 3), + ("b", 0, 0): [1, 2, 3], + "c": (None, slice(1, 2, 3), slice(None)), + ("d", 1, 1): (dumps, "data"), + "e": (xd.DataArray, "path"), + } + assert loads(dumps(obj)) == obj + + +def test_methdocaller(): + obj = methodcaller("method") + assert loads(dumps(obj)) == obj + + +def test_itemgetter(): + obj = itemgetter(1) + assert loads(dumps(obj)) == obj + + +def test_unknown_type(): + with pytest.raises( + TypeError, match="Cannot encode object of type " + ): + dumps(object()) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index 05f1ea5..3c6c2ef 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -1,5 +1,4 @@ import json -import socket import threading import time @@ -19,8 +18,8 @@ def get_free_local_address(): "time": { "tie_indices": [0, 99], "tie_values": [ - np.datetime64("2020-01-01T00:00:00.000"), - np.datetime64("2020-01-01T00:00:09.900"), + np.datetime64("2020-01-01T00:00:00.000000000"), + np.datetime64("2020-01-01T00:00:09.900000000"), ], }, "distance": {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]}, diff --git a/tests/io/test_miniseed.py b/tests/io/test_miniseed.py new file mode 100644 index 0000000..9da16d2 --- /dev/null +++ b/tests/io/test_miniseed.py @@ -0,0 +1,81 @@ +from glob import glob +from tempfile import TemporaryDirectory + +import numpy as np +import obspy + +import xdas as xd + + +def make_network(dirpath): + for idx in range(1, 11): + st = make_station(idx) + st.write(f"{dirpath}/{st[0].id[:-4]}.mseed") + + +def make_station(idx): + st = obspy.Stream() + for component in ["Z", "N", "E"]: + st.append(make_trace(idx, component)) + return st + + +def make_trace(idx, component): + data = np.random.rand(100) + header = { + "delta": 0.01, + "starttime": obspy.UTCDateTime(0), + "network": "DX", + "station": f"CH{idx:03d}", + "location": "00", + "channel": f"HH{component}", + } + tr = obspy.Trace(data, header) + return tr + + +def test_miniseed(): + with TemporaryDirectory() as dirpath: + make_network(dirpath) + paths = sorted(glob(f"{dirpath}/*.mseed")) + + # read one file + da = xd.open_dataarray(paths[0], engine="miniseed") + assert da.shape == (3, 100) + assert da.dims == ("channel", "time") + assert da.coords["time"].isinterp() + assert da.coords["time"][0].values == np.datetime64("1970-01-01T00:00:00") + assert da.coords["time"][-1].values == np.datetime64("1970-01-01T00:00:00.990") + assert da.coords["network"].values == "DX" + assert da.coords["station"].values == "CH001" + assert da.coords["location"].values == "00" + assert da.coords["channel"].values.tolist() == ["HHZ", "HHN", "HHE"] + + # manually concatenate several files + objs = [xd.open_dataarray(path, engine="miniseed") for path in paths] + da = xd.concatenate(objs, "station") + assert da.shape == (10, 3, 100) + assert da.dims == ("station", "channel", "time") + assert da.coords["station"].values.tolist() == [ + f"CH{i:03d}" for i in range(1, 11) + ] + assert da.coords["time"].isinterp() + assert da.coords["time"][0].values == np.datetime64("1970-01-01T00:00:00") + assert da.coords["time"][-1].values == np.datetime64("1970-01-01T00:00:00.990") + assert da.coords["network"].values == "DX" + assert da.coords["location"].values == "00" + assert da.coords["channel"].values.tolist() == ["HHZ", "HHN", "HHE"] + + # automatically open multiple files + da = xd.open_mfdataarray(f"{dirpath}/*.mseed", dim="station", engine="miniseed") + assert da.shape == (10, 3, 100) + assert da.dims == ("station", "channel", "time") + assert da.coords["station"].values.tolist() == [ + f"CH{i:03d}" for i in range(1, 11) + ] + assert da.coords["time"].isinterp() + assert da.coords["time"][0].values == np.datetime64("1970-01-01T00:00:00") + assert da.coords["time"][-1].values == np.datetime64("1970-01-01T00:00:00.990") + assert da.coords["network"].values == "DX" + assert da.coords["location"].values == "00" + assert da.coords["channel"].values.tolist() == ["HHZ", "HHN", "HHE"] diff --git a/tests/test_coordinates.py b/tests/test_coordinates.py index b0dd2a6..1480917 100644 --- a/tests/test_coordinates.py +++ b/tests/test_coordinates.py @@ -82,6 +82,15 @@ def test_isinstance(self): assert not ScalarCoordinate(1).isdense() assert not ScalarCoordinate(1).isinterp() + def test_to_from_dict(self): + for data in self.valid: + coord = ScalarCoordinate(data) + assert ScalarCoordinate.from_dict(coord.to_dict()).equals(coord) + + def test_empty(self): + with pytest.raises(TypeError, match="cannot be empty"): + ScalarCoordinate() + class TestDenseCoordinate: valid = [ @@ -188,6 +197,32 @@ def test_to_index(self): DenseCoordinate([1, 2, 3]).to_index(slice(2, None)), slice(1, 3) ) + def test_to_from_dict(self): + for data in self.valid: + coord = DenseCoordinate(data) + assert DenseCoordinate.from_dict(coord.to_dict()).equals(coord) + + def test_empty(self): + coord = DenseCoordinate() + assert coord.empty + + def test_append(self): + coord0 = DenseCoordinate() + coord1 = DenseCoordinate([1, 2, 3]) + coord2 = DenseCoordinate([4, 5, 6]) + + result = coord1.append(coord2) + expected = DenseCoordinate([1, 2, 3, 4, 5, 6]) + assert result.equals(expected) + + result = coord2.append(coord1) + expected = DenseCoordinate([4, 5, 6, 1, 2, 3]) + assert result.equals(expected) + + assert coord0.append(coord0).empty + assert coord0.append(coord1).equals(coord1) + assert coord1.append(coord0).equals(coord1) + class TestInterpCoordinate: valid = [ @@ -472,6 +507,30 @@ def test_singleton(self): coord = InterpCoordinate({"tie_indices": [0], "tie_values": [1.0]}) assert coord[0].values == 1.0 + def test_to_from_dict(self): + for data in self.valid: + coord = InterpCoordinate(data) + assert InterpCoordinate.from_dict(coord.to_dict()).equals(coord) + + def test_append(self): + coord0 = InterpCoordinate() + coord1 = InterpCoordinate({"tie_indices": [0, 2], "tie_values": [0, 20]}) + coord2 = InterpCoordinate({"tie_indices": [0, 2], "tie_values": [30, 50]}) + + result = coord1.append(coord2) + expected = InterpCoordinate({"tie_indices": [0, 5], "tie_values": [0, 50]}) + assert result.equals(expected) + + result = coord2.append(coord1) + expected = InterpCoordinate( + {"tie_indices": [0, 2, 3, 5], "tie_values": [30, 50, 0, 20]} + ) + assert result.equals(expected) + + assert coord0.append(coord0).empty + assert coord0.append(coord1).equals(coord1) + assert coord1.append(coord0).equals(coord1) + class TestCoordinate: def test_new(self): @@ -488,6 +547,10 @@ def test_to_dataarray(self): expected = xdas.DataArray([1, 2, 3], {"dim": [1, 2, 3]}, name="dim") assert result.equals(expected) + def test_empty(self): + with pytest.raises(TypeError, match="cannot infer coordinate type"): + xdas.Coordinate() + class TestCoordinates: def test_init(self): @@ -543,3 +606,15 @@ def test_setitem(self): assert coords.dims == ("dim_0", "dim_1", "dim_2") with pytest.raises(TypeError, match="must be of type str"): coords[0] = ... + + def test_to_from_dict(self): + starttime = np.datetime64("2020-01-01T00:00:00.000") + endtime = np.datetime64("2020-01-01T00:00:10.000") + coords = { + "time": {"tie_indices": [0, 999], "tie_values": [starttime, endtime]}, + "distance": np.linspace(0, 1000, 3), + "channel": ("distance", ["DAS01", "DAS02", "DAS03"]), + "interrogator": (None, "SRN"), + } + coords = xdas.Coordinates(coords) + assert xdas.Coordinates.from_dict(coords.to_dict()).equals(coords) diff --git a/tests/test_core.py b/tests/test_core.py index f29b29e..aeb5c2d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -156,6 +156,44 @@ def test_concatenate(self): }, ) assert xdas.concatenate((da1, da2), dim="time").equals(expected) + # concat dense coordinates + da1 = xdas.DataArray( + data=np.zeros((5, 4, 3)), + coords={ + "phase": ["A", "B", "C"], + "time": [0, 1, 2, 3, 4], + "distance": [0.0, 1.0, 2.0, 3.0], + }, + dims=("time", "distance", "phase"), + ) + da2 = xdas.DataArray( + data=np.ones((7, 4, 3)), + coords={ + "phase": ["A", "B", "C"], + "time": [5, 6, 7, 8, 9, 10, 11], + "distance": [0.0, 1.0, 2.0, 3.0], + }, + dims=("time", "distance", "phase"), + ) + expected = xdas.DataArray( + data=np.concatenate((np.zeros((5, 4, 3)), np.ones((7, 4, 3))), axis=0), + coords={ + "phase": ["A", "B", "C"], + "time": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + "distance": [0.0, 1.0, 2.0, 3.0], + }, + dims=("time", "distance", "phase"), + ) + assert xdas.concatenate((da1, da2), dim="time").equals(expected) + # stack + da = wavelet_wavefronts() + objs = [obj for obj in da] + result = xdas.concatenate(objs, dim="time") + result["time"] = xdas.InterpCoordinate.from_array(result["time"].values) + assert result.equals(da) + objs = [obj.drop_coords("time") for obj in da] + result = xdas.concatenate(objs, dim="time") + assert result.equals(da.drop_coords("time")) def test_open_dataarray(self): with pytest.raises(FileNotFoundError): diff --git a/tests/test_dataarray.py b/tests/test_dataarray.py index 2282199..c3a5c2a 100644 --- a/tests/test_dataarray.py +++ b/tests/test_dataarray.py @@ -1,6 +1,8 @@ import os +from glob import glob from tempfile import TemporaryDirectory +import dask import hdf5plugin import numpy as np import pytest @@ -190,6 +192,10 @@ def test_from_xarray(self): def test_stream(self): da = wavelet_wavefronts() + da["time"] = { + "tie_indices": da["time"].tie_indices, + "tie_values": da["time"].tie_values.astype("datetime64[us]"), + } st = da.to_stream(dim={"distance": "time"}) assert st[0].id == "NET.DAS00001.00.BN1" assert len(st) == da.sizes["distance"] @@ -327,6 +333,12 @@ def test_expand_dims(self): assert result.dims == ("y", "x") assert result.shape == (1, 3) + da = DataArray([1.0, 2.0, 3.0], {"x": [0, 1, 2], "y": 0}, dims=("x",)) + result = da.expand_dims("y") + assert result.dims == ("y", "x") + assert result.shape == (1, 3) + assert result["y"].equals(xdas.Coordinate([0], dim="y")) + def test_io(self): # both coords interpolated da = wavelet_wavefronts() @@ -372,6 +384,44 @@ def test_io_with_zfp_compression(self): _da = DataArray.from_netcdf(tmpfile_compressed) assert np.abs(da - _da).max().values < 0.001 + def test_io_dask(self): + with TemporaryDirectory() as tmpdir: + values = np.random.rand(3, 10) + chunks = np.split(values, 5, axis=1) + for idx, chunk in enumerate(chunks): + np.save(os.path.join(tmpdir, f"chunk_{idx}.npy"), chunk) + paths = glob(os.path.join(tmpdir, "*.npy")) + chunks = [dask.delayed(np.load)(path) for path in paths] + chunks = [ + dask.array.from_delayed(chunk, shape=(3, 2), dtype=values.dtype) + for chunk in chunks + ] + data = dask.array.concatenate(chunks, axis=1) + expected = DataArray( + data, + coords={"time": np.arange(3), "distance": np.arange(10)}, + attrs={"version": "1.0"}, + name="data", + ) + fname = os.path.join(tmpdir, "tmp.nc") + expected.to_netcdf(fname) + result = xdas.open_dataarray(fname) + assert isinstance(result.data, dask.array.Array) + assert np.array_equal(expected.values, result.values) + assert expected.dtype == result.dtype + assert expected.coords.equals(result.coords) + assert expected.dims == result.dims + assert expected.name == result.name + assert expected.attrs == result.attrs + + def test_io_non_dimensional(self): + expected = DataArray(coords={"dim": 0}, dims=()) + with TemporaryDirectory() as dirpath: + path = os.path.join(dirpath, "tmp.nc") + expected.to_netcdf(path) + result = DataArray.from_netcdf(path) + assert expected.equals(result) + def test_ufunc(self): da = wavelet_wavefronts() result = np.add(da, 1) diff --git a/tests/test_routines.py b/tests/test_routines.py new file mode 100644 index 0000000..a7ff99c --- /dev/null +++ b/tests/test_routines.py @@ -0,0 +1,195 @@ +import numpy as np +import pytest + +from xdas.core.coordinates import Coordinates +from xdas.core.dataarray import DataArray +from xdas.core.routines import Bag, CompatibilityError, combine_by_coords + + +class TestBag: + def test_bag_initialization(self): + bag = Bag(dim="time") + assert bag.dim == "time" + assert bag.objs == [] + + def test_bag_append_initializes(self): + da = DataArray( + np.random.rand(10, 5), {"time": np.arange(10), "space": np.arange(5)} + ) + bag = Bag(dim="time") + bag.append(da) + assert len(bag.objs) == 1 + assert bag.objs[0] is da + assert bag.subcoords.equals(Coordinates({"space": np.arange(5)})) + assert bag.subshape == (5,) + assert bag.dims == ("time", "space") + assert bag.delta + + def test_bag_append_compatible(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + bag = Bag(dim="time") + bag.append(da1) + bag.append(da2) + assert len(bag.objs) == 2 + assert bag.objs[1] is da2 + da1 = DataArray( + np.random.rand(10, 5), {"time": np.arange(10), "space": np.arange(5)} + ) + da2 = DataArray( + np.random.rand(10, 5), {"time": np.arange(10, 20), "space": np.arange(5)} + ) + bag = Bag(dim="time") + bag.append(da1) + bag.append(da2) + assert len(bag.objs) == 2 + assert bag.objs[1] is da2 + + def test_bag_append_incompatible_dims(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 5), dims=("space", "time")) + bag = Bag(dim="time") + bag.append(da1) + with pytest.raises(CompatibilityError): + bag.append(da2) + + def test_bag_append_incompatible_shape(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 6), dims=("time", "space")) + bag = Bag(dim="time") + bag.append(da1) + with pytest.raises(CompatibilityError): + bag.append(da2) + + def test_bag_append_incompatible_dtype(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.randint(0, 10, size=(10, 5)), dims=("time", "space")) + bag = Bag(dim="time") + bag.append(da1) + with pytest.raises(CompatibilityError): + bag.append(da2) + + def test_bag_append_incompatible_coords(self): + da1 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"space": np.arange(5)}, + ) + da2 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"space": np.arange(5) + 1}, + ) + bag = Bag(dim="time") + bag.append(da1) + with pytest.raises(CompatibilityError): + bag.append(da2) + + def test_bag_append_incompatible_sampling_interval(self): + da1 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"time": np.arange(10)}, + ) + da2 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"time": np.arange(10) * 2}, + ) + bag = Bag(dim="time") + bag.append(da1) + with pytest.raises(CompatibilityError): + bag.append(da2) + + +class TestCombineByCoords: + def test_basic(self): + # without coords + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + combined = combine_by_coords([da1, da2], dim="time", squeeze=True) + assert combined.shape == (20, 5) + + # with coords + da1 = DataArray( + np.random.rand(10, 5), + coords={"time": np.arange(10), "space": np.arange(5)}, + ) + da2 = DataArray( + np.random.rand(10, 5), + coords={"time": np.arange(10, 20), "space": np.arange(5)}, + ) + combined = combine_by_coords([da1, da2], dim="time", squeeze=True) + assert combined.shape == (20, 5) + + def test_incompatible_shape(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 6), dims=("time", "space")) + dc = combine_by_coords([da1, da2], dim="time") + assert len(dc) == 2 + assert dc[0].equals(da1) + assert dc[1].equals(da2) + + def test_incompatible_dims(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.rand(10, 5), dims=("space", "time")) + dc = combine_by_coords([da1, da2], dim="time") + assert len(dc) == 2 + assert dc[0].equals(da1) + assert dc[1].equals(da2) + + def test_incompatible_dtype(self): + da1 = DataArray(np.random.rand(10, 5), dims=("time", "space")) + da2 = DataArray(np.random.randint(0, 10, size=(10, 5)), dims=("time", "space")) + dc = combine_by_coords([da1, da2], dim="time") + assert len(dc) == 2 + assert dc[0].equals(da1) + assert dc[1].equals(da2) + + def test_incompatible_coords(self): + da1 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"space": np.arange(5)}, + ) + da2 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"space": np.arange(5) + 1}, + ) + dc = combine_by_coords([da1, da2], dim="time") + assert len(dc) == 2 + assert dc[0].equals(da1) + assert dc[1].equals(da2) + + def test_incompatible_sampling_interval(self): + da1 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"time": np.arange(10)}, + ) + da2 = DataArray( + np.random.rand(10, 5), + dims=("time", "space"), + coords={"time": np.arange(10) * 2}, + ) + dc = combine_by_coords([da1, da2], dim="time") + assert len(dc) == 2 + assert dc[0].equals(da1) + assert dc[1].equals(da2) + + def test_expand_scalar_coordinate(self): + da1 = DataArray( + np.random.rand(10), + dims=("time",), + coords={"time": np.arange(10), "space": 0}, + ) + da2 = DataArray( + np.random.rand(10), + dims=("time",), + coords={"time": np.arange(10), "space": 1}, + ) + dc = combine_by_coords([da1, da2], dim="space", squeeze=True) + assert dc.shape == (2, 10) + assert dc.dims == ("space", "time") + assert dc.coords["space"].values.tolist() == [0, 1] diff --git a/xdas/core/coordinates.py b/xdas/core/coordinates.py index 19d943d..58c5837 100644 --- a/xdas/core/coordinates.py +++ b/xdas/core/coordinates.py @@ -64,7 +64,7 @@ def __init__(self, coords=None, dims=None): if dims is None: dims = coords.dims coords = dict(coords) - self._dims = () if dims is None else dims + self._dims = () if dims is None else tuple(dims) self._parent = None if coords is not None: for name in coords: @@ -179,21 +179,37 @@ def to_dict(self): >>> import xdas - >>> coords = { - ... "time": {"tie_indices": [0, 999], "tie_values": [0.0, 10.0]}, - ... "distance": [0, 1, 2], - ... "channel": ("distance", ["DAS01", "DAS02", "DAS03"]), - ... "interrogator": (None, "SRN"), - ... } - >>> xdas.Coordinates(coords).to_dict() - {'time': {'dim': 'time', - 'data': {'tie_indices': [0, 999], 'tie_values': [0.0, 10.0]}}, - 'distance': {'dim': 'distance', 'data': [0, 1, 2]}, - 'channel': {'dim': 'distance', 'data': ['DAS01', 'DAS02', 'DAS03']}, - 'interrogator': {'dim': None, 'data': 'SRN'}} + >>> coords = xdas.Coordinates( + ... { + ... "time": {"tie_indices": [0, 999], "tie_values": [0.0, 10.0]}, + ... "distance": [0, 1, 2], + ... "channel": ("distance", ["DAS01", "DAS02", "DAS03"]), + ... "interrogator": (None, "SRN"), + ... } + ... ) + >>> coords.to_dict() + {'dims': ('time', 'distance'), + 'coords': {'time': {'dim': 'time', + 'data': {'tie_indices': [0, 999], 'tie_values': [0.0, 10.0]}, + 'dtype': 'float64'}, + 'distance': {'dim': 'distance', 'data': [0, 1, 2], 'dtype': 'int64'}, + 'channel': {'dim': 'distance', + 'data': ['DAS01', 'DAS02', 'DAS03'], + 'dtype': '}") + if dtype is not None: + raise ValueError("`dtype` is not supported for DefaultCoordinate") self.data = data self.dim = dim @@ -418,19 +456,28 @@ def get_indexer(self, value, method=None): def slice_indexer(self, start=None, stop=None, step=None, endpoint=True): return slice(start, stop, step) + def append(self, other): + if not isinstance(other, self.__class__): + raise TypeError(f"cannot append {type(other)} to {self.__class__}") + if not self.dim == other.dim: + raise ValueError("cannot append coordinate with different dimension") + return self.__class__({"size": len(self) + len(other)}, self.dim) + def to_dict(self): - return {"dim": self.dim, "data": self.data} + return {"dim": self.dim, "data": self.data.tolist(), "dtype": str(self.dtype)} class DenseCoordinate(Coordinate): def __new__(cls, *args, **kwargs): return object.__new__(cls) - def __init__(self, data, dim=None): + def __init__(self, data=None, dim=None, dtype=None): + if data is None: + data = [] data, dim = parse(data, dim) if not self.isvalid(data): raise TypeError("`data` must be array-like") - self.data = np.asarray(data) + self.data = np.asarray(data, dtype=dtype) self.dim = dim @staticmethod @@ -444,7 +491,11 @@ def index(self): def equals(self, other): if isinstance(other, self.__class__): - return np.array_equal(self.data, other.data) + return ( + np.array_equal(self.data, other.data) + and self.dim == other.dim + and self.dtype == other.dtype + ) else: return False @@ -467,12 +518,25 @@ def slice_indexer(self, start=None, stop=None, step=None, endpoint=True): slc = slice(slc.start, slc.stop - 1, slc.step) return slc + def append(self, other): + if not isinstance(other, self.__class__): + raise TypeError(f"cannot append {type(other)} to {self.__class__}") + if not self.dim == other.dim: + raise ValueError("cannot append coordinate with different dimension") + if self.empty: + return other + if other.empty: + return self + if not self.dtype == other.dtype: + raise ValueError("cannot append coordinate with different dtype") + return self.__class__(np.concatenate([self.data, other.data]), self.dim) + def to_dict(self): if np.issubdtype(self.dtype, np.datetime64): - data = list(self.data.astype(str)) + data = self.data.astype(str).tolist() else: - data = list(self.data) - return {"dim": self.dim, "data": data} + data = self.data.tolist() + return {"dim": self.dim, "data": data, "dtype": str(self.dtype)} class InterpCoordinate(Coordinate): @@ -496,7 +560,9 @@ class InterpCoordinate(Coordinate): def __new__(cls, *args, **kwargs): return object.__new__(cls) - def __init__(self, data, dim=None): + def __init__(self, data=None, dim=None, dtype=None): + if data is None: + data = {"tie_indices": [], "tie_values": []} data, dim = parse(data, dim) if not self.__class__.isvalid(data): raise TypeError("`data` must be dict-like") @@ -505,7 +571,7 @@ def __init__(self, data, dim=None): "both `tie_indices` and `tie_values` key should be provided" ) tie_indices = np.asarray(data["tie_indices"]) - tie_values = np.asarray(data["tie_values"]) + tie_values = np.asarray(data["tie_values"], dtype=dtype) if not tie_indices.ndim == 1: raise ValueError("`tie_indices` must be 1D") if not tie_values.ndim == 1: @@ -626,8 +692,11 @@ def values(self): return self.get_value(self.indices) def equals(self, other): - return np.array_equal(self.tie_indices, other.tie_indices) and np.array_equal( - self.tie_values, other.tie_values + return ( + np.array_equal(self.tie_indices, other.tie_indices) + and np.array_equal(self.tie_values, other.tie_values) + and self.dim == other.dim + and self.dtype == other.dtype ) def get_value(self, index): @@ -728,6 +797,29 @@ def slice_indexer(self, start=None, stop=None, step=None, endpoint=True): stop_index -= 1 return slice(start_index, stop_index) + def append(self, other): + if not isinstance(other, self.__class__): + raise TypeError(f"cannot append {type(other)} to {self.__class__}") + if not self.dim == other.dim: + raise ValueError("cannot append coordinate with different dimension") + if self.empty: + return other + if other.empty: + return self + if not self.dtype == other.dtype: + raise ValueError("cannot append coordinate with different dtype") + coord = self.__class__( + { + "tie_indices": np.append( + self.tie_indices, other.tie_indices + len(self) + ), + "tie_values": np.append(self.tie_values, other.tie_values), + }, + self.dim, + ) + coord = coord.simplify() + return coord + def decimate(self, q): tie_indices = (self.tie_indices // q) * q for k in range(1, len(tie_indices) - 1): @@ -846,8 +938,11 @@ def to_dict(self): tie_values = self.data["tie_values"] if np.issubdtype(tie_values.dtype, np.datetime64): tie_values = tie_values.astype(str) - data = {"tie_indices": list(tie_indices), "tie_values": list(tie_values)} - return {"dim": self.dim, "data": data} + data = { + "tie_indices": tie_indices.tolist(), + "tie_values": tie_values.tolist(), + } + return {"dim": self.dim, "data": data, "dtype": str(self.dtype)} def parse(data, dim=None): @@ -881,6 +976,10 @@ def get_sampling_interval(da, dim, cast=True): float The sample spacing. """ + if da.sizes[dim] < 2: + raise ValueError( + "cannot compute sample spacing on a dimension with less than 2 points" + ) coord = da[dim] if isinstance(coord, InterpCoordinate): num = np.diff(coord.tie_values) diff --git a/xdas/core/dataarray.py b/xdas/core/dataarray.py index 6f6e9ec..495c7b4 100644 --- a/xdas/core/dataarray.py +++ b/xdas/core/dataarray.py @@ -1,4 +1,5 @@ import copy +import json import re import warnings from functools import partial @@ -8,9 +9,11 @@ import hdf5plugin import numpy as np import xarray as xr +from dask.array import Array as DaskArray from numpy.lib.mixins import NDArrayOperatorsMixin -from ..virtual import VirtualArray, VirtualSource +from ..dask.core import dumps, from_dict, loads, to_dict +from ..virtual import VirtualArray, VirtualSource, _to_human from .coordinates import Coordinate, Coordinates, get_sampling_interval HANDLED_NUMPY_FUNCTIONS = {} @@ -101,6 +104,8 @@ def __repr__(self): data_repr = np.array2string( self.data, precision=precision, threshold=0, edgeitems=edgeitems ) + elif isinstance(self.data, DaskArray): + data_repr = f"DaskArray: {_to_human(self.data.nbytes)} ({self.data.dtype})" else: data_repr = repr(self.data) string = ".+)") + regex = regex.replace(placeholder, f"(?P<{placeholder[1:-1]}>.+)", 1) + regex = regex.replace(placeholder, f"(?P={placeholder[1:-1]})") else: regex = regex.replace(placeholder, r".*") regex = re.compile(regex) @@ -320,7 +321,8 @@ def open_mfdataarray( "The maximum number of file that can be opened at once is for now limited " "to 100 000." ) - with ProcessPoolExecutor() as executor: + max_workers = 1 if engine == "miniseed" else None # TODO: dirty fix + with ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(open_dataarray, path, engine=engine, **kwargs) for path in paths @@ -334,7 +336,7 @@ def open_mfdataarray( else: iterator = as_completed(futures) objs = [future.result() for future in iterator] - return combine_by_coords(objs, dim, tolerance, squeeze, True, verbose) + return combine_by_coords(objs, dim, tolerance, squeeze, None, verbose) def open_dataarray(fname, group=None, engine=None, **kwargs): @@ -531,31 +533,117 @@ def combine_by_coords( DataSequence or DataArray The combined data arrays. """ - objs = sorted(objs, key=lambda da: da[dim][0].values) - out = [] - bag = [] + # parse dim + if dim == "first": + dim = objs[0].dims[0] + if dim == "last": + dim = objs[0].dims[-1] + + # sort objs by dim + if dim in objs[0].coords: + objs = sorted( + objs, + key=lambda da: da[dim].values if da[dim].isscalar() else da[dim][0].values, + ) + + # combine objs + bags = [] + bag = Bag(dim) for da in objs: - if not bag: - bag = [da] - elif ( - da.coords.drop_dims(dim).equals(bag[-1].coords.drop_dims(dim)) - and (get_sampling_interval(da, dim) == get_sampling_interval(bag[-1], dim)) - and (da.dtype == bag[-1].dtype) - ): + try: bag.append(da) - else: - out.append(bag) - bag = [da] - out.append(bag) + except CompatibilityError: + bags.append(bag) + bag = Bag(dim) + bag.append(da) + bags.append(bag) + + # concatenate each bag collection = DataCollection( - [concatenate(bag, dim, tolerance, virtual, verbose) for bag in out] + [concatenate(bag, dim, tolerance, virtual, verbose) for bag in bags] ) + + # squeeze if possible if squeeze and len(collection) == 1: return collection[0] else: return collection +class CompatibilityError(Exception): + """Custom exception to signal required splitting.""" + + def __init__(self, message): + super().__init__(message) + + +class Bag: + def __init__(self, dim): + self.objs = [] + self.dim = dim + + def __iter__(self): + return iter(self.objs) + + def initialize(self, da): + self.objs = [da] + self.dims = da.dims + self.subshape = tuple( + size for dim, size in da.sizes.items() if not dim == self.dim + ) + self.subcoords = ( + da.coords.drop_dims(self.dim) + if self.dim in self.dims + else da.coords.drop_coords(self.dim) + ) + try: + self.delta = get_sampling_interval(da, self.dim) + except (ValueError, KeyError): + self.delta = None + self.dtype = da.dtype + + def append(self, da): + if not self.objs: + self.initialize(da) + else: + self.check_dims(da) + self.check_shape(da) + self.check_coords(da) + self.check_sampling_interval(da) + self.check_dtype(da) + self.objs.append(da) + + def check_dims(self, da): + if not self.dims == da.dims: + raise CompatibilityError("dimensions are not compatible") + + def check_shape(self, da): + subshape = tuple(size for dim, size in da.sizes.items() if not dim == self.dim) + if not self.subshape == subshape: + raise CompatibilityError("shapes are not compatible") + + def check_dtype(self, da): + if not self.dtype == da.dtype: + raise CompatibilityError("data types are not compatible") + + def check_coords(self, da): + subcoords = ( + da.coords.drop_dims(self.dim) + if self.dim in self.dims + else da.coords.drop_coords(self.dim) + ) + if not self.subcoords.equals(subcoords): + raise CompatibilityError("coordinates are not compatible") + + def check_sampling_interval(self, da): + if self.delta is None: + pass + else: + delta = get_sampling_interval(da, self.dim) + if not np.isclose(delta, self.delta): + raise CompatibilityError("sampling intervals are not compatible") + + def concatenate(objs, dim="first", tolerance=None, virtual=None, verbose=None): """ Concatenate data arrays along a given dimension. @@ -586,23 +674,27 @@ def concatenate(objs, dim="first", tolerance=None, virtual=None, verbose=None): if virtual is None: virtual = all(isinstance(da.data, (VirtualSource, VirtualStack)) for da in objs) - if not all(isinstance(da[dim], InterpCoordinate) for da in objs): - raise NotImplementedError("can only concatenate along interpolated coordinate") + if dim in objs[0].dims + ("first", "last"): + axis = objs[0].get_axis_num(dim) + dim = objs[0].dims[axis] # ensure not "first" or "last" + dims = objs[0].dims + else: + axis = 0 + dims = (dim, *objs[0].dims) + objs = [da.expand_dims(dim) for da in objs] - obj = objs[0] - axis = obj.get_axis_num(dim) - dim = obj.dims[axis] - coords = obj.coords.copy() - dims = obj.dims - name = obj.name - attrs = obj.attrs + coords = objs[0].coords.copy() + name = objs[0].name + attrs = objs[0].attrs + + dim_has_coords = dim in coords + + if dim_has_coords: + objs = sorted(objs, key=lambda da: da[dim][0].values) + coord = coords[dim].__class__(data=None, dim=dim, dtype=coords[dim].dtype) - objs = sorted(objs, key=lambda da: da[dim][0].values) iterator = tqdm(objs, desc="Linking dataarray") if verbose else objs data = [] - tie_indices = [] - tie_values = [] - idx = 0 for da in iterator: if isinstance(da.data, VirtualStack): for source in da.data.sources: @@ -610,18 +702,22 @@ def concatenate(objs, dim="first", tolerance=None, virtual=None, verbose=None): else: data.append(da.data) - tie_indices.extend(idx + da[dim].tie_indices) - tie_values.extend(da[dim].tie_values) - idx += da.shape[axis] + if dim in coords: + coord = coord.append(da[dim]) if virtual: data = VirtualStack(data, axis) else: data = np.concatenate(data, axis) - - coords[dim] = InterpCoordinate( - {"tie_indices": tie_indices, "tie_values": tie_values}, dim - ).simplify(tolerance) + if dim_has_coords: + if tolerance is not None: + if hasattr(coord, "simplify"): + coord = coord.simplify(tolerance) + else: + raise TypeError( + "tolerance can only be used with interpolated coordinates" + ) + coords[dim] = coord return DataArray(data, coords, dims, name, attrs) @@ -867,6 +963,11 @@ def plot_availability(obj, dim="first", **kwargs): **kwargs Additional keyword arguments to be passed to the `px.timeline` function. + Returns + ------- + fig : plotly.graph_objects.Figure + The timeline + Notes ----- This function uses the `px.timeline` function from the `plotly.express` library. @@ -890,7 +991,7 @@ def plot_availability(obj, dim="first", **kwargs): for elem in fig.data: elem["marker"]["line_color"] = color_discrete_map[elem["legendgroup"]] fig.update_yaxes(title_text="") - fig.show() + return fig def _get_timeline_dataframe(obj, dim="first", name=None): diff --git a/xdas/dask/__init__.py b/xdas/dask/__init__.py new file mode 100644 index 0000000..60e4af0 --- /dev/null +++ b/xdas/dask/__init__.py @@ -0,0 +1 @@ +from .core import dumps, loads diff --git a/xdas/dask/core.py b/xdas/dask/core.py new file mode 100644 index 0000000..2397223 --- /dev/null +++ b/xdas/dask/core.py @@ -0,0 +1,58 @@ +from dask.array import Array + +from . import serial + + +def dumps(arr): + """Serialize a dask array.""" + return serial.dumps(to_dict(arr)) + + +def loads(data): + """Deserialize a dask array.""" + return from_dict(serial.loads(data)) + + +def to_dict(arr): + """Convert a dask array to a dictionary.""" + graph = arr.__dask_graph__().cull(arr.__dask_keys__()) + graph = fuse(graph) + return { + "dask": graph, # TODO: fuse then encode fails... + "name": arr.name, + "chunks": arr.chunks, + "dtype": str(arr.dtype), + } + + +def from_dict(dct): + """Convert a dictionary to a dask array.""" + return Array(**dct) + + +def fuse(graph): + """Simpligy a graph by grouping intermediate empty computations.""" + dsk = {} + ignore = set() + for key, computation in graph.items(): + if key in ignore: + continue + while iskey(computation) and computation in graph: + ignore.add(computation) + computation = graph[computation] + dsk[key] = computation + return dsk + + +def iskey(obj): + if isinstance(obj, str) and len(obj) > 0: + return True + elif ( + isinstance(obj, tuple) + and len(obj) > 1 + and isinstance(obj[0], str) + and all(isinstance(index, int) for index in obj[1:]) + ): + return True + else: + return False diff --git a/xdas/dask/serial.py b/xdas/dask/serial.py new file mode 100644 index 0000000..bcf6862 --- /dev/null +++ b/xdas/dask/serial.py @@ -0,0 +1,58 @@ +import importlib + +import msgpack +from dask.utils import itemgetter, methodcaller + +codes = { + "tuple": 1, + "slice": 2, + "callable": 3, + "methodcaller": 4, + "itemgetter": 5, +} + + +def encode(obj): + if isinstance(obj, tuple): + code = codes["tuple"] + obj = list(obj) + elif isinstance(obj, slice): + code = codes["slice"] + obj = {"start": obj.start, "stop": obj.stop, "step": obj.step} + elif isinstance(obj, methodcaller): + code = codes["methodcaller"] + obj = obj.method + elif isinstance(obj, itemgetter): + code = codes["itemgetter"] + obj = obj.index + elif callable(obj): + code = codes["callable"] + obj = {"module": obj.__module__, "name": obj.__name__} + else: + raise TypeError(f"Cannot encode object of type {type(obj)}") + data = dumps(obj) + return msgpack.ExtType(code, data) + + +def decode(code, data): + obj = loads(data) + if code == codes["tuple"]: + return tuple(obj) + elif code == codes["slice"]: + return slice(obj["start"], obj["stop"], obj["step"]) + elif code == codes["callable"]: + return getattr(importlib.import_module(obj["module"]), obj["name"]) + elif code == codes["methodcaller"]: + return methodcaller(obj) + elif code == codes["itemgetter"]: + return itemgetter(obj) + else: + raise ValueError(f"Unknown code {code}") + + +def dumps(obj): + return msgpack.dumps(obj, default=encode, strict_types=True) + + +def loads(obj): + return msgpack.loads(obj, strict_map_key=False, ext_hook=decode) diff --git a/xdas/io/__init__.py b/xdas/io/__init__.py index 008cd21..b571101 100644 --- a/xdas/io/__init__.py +++ b/xdas/io/__init__.py @@ -1,2 +1,2 @@ -from . import asn, febus, optasense, sintela, terra15 +from . import asn, febus, miniseed, optasense, silixa, sintela, terra15 from .core import get_free_port diff --git a/xdas/io/miniseed.py b/xdas/io/miniseed.py new file mode 100644 index 0000000..7448857 --- /dev/null +++ b/xdas/io/miniseed.py @@ -0,0 +1,66 @@ +import dask +import numpy as np +import obspy + +from ..core.coordinates import Coordinates +from ..core.dataarray import DataArray + + +def read(fname): + shape, dtype, coords = read_header(fname) + data = dask.array.from_delayed(dask.delayed(read_data)(fname), shape, dtype) + return DataArray(data, coords) + + +def read_header(path): + st = obspy.read(path, headonly=True) + + dtype = uniquifiy(tr.data.dtype for tr in st) + if not isinstance(dtype, np.dtype): + raise ValueError("All traces must have the same dtype") + + time = get_time_coord(st[0]) + if not all(get_time_coord(tr) == time for tr in st): + raise ValueError("All traces must be synchronized") + + network = uniquifiy(tr.stats.network for tr in st) + stations = uniquifiy(tr.stats.station for tr in st) + locations = uniquifiy(tr.stats.location for tr in st) + channels = uniquifiy(tr.stats.channel for tr in st) + + coords = Coordinates( + { + "network": network, + "station": stations, + "location": locations, + "channel": channels, + "time": time, + } + ) + + shape = tuple(len(coord) for coord in coords.values() if not coord.isscalar()) + return shape, dtype, coords + + +def read_data(path): + st = obspy.read(path) + return np.array(st) + + +def get_time_coord(tr): + return { + "tie_indices": [0, tr.stats.npts - 1], + "tie_values": [ + np.datetime64(tr.stats.starttime), + np.datetime64(tr.stats.endtime), + ], + } + + +def uniquifiy(seq): + seen = set() + seq = list(x for x in seq if x not in seen and not seen.add(x)) + if len(seq) == 1: + return seq[0] + else: + return seq diff --git a/xdas/io/silixa.py b/xdas/io/silixa.py new file mode 100644 index 0000000..f795583 --- /dev/null +++ b/xdas/io/silixa.py @@ -0,0 +1,36 @@ +import dask +import numpy as np + +from ..core.dataarray import DataArray +from .tdms import TdmsReader + + +def read(fname): + shape, dtype, coords = read_header(fname) + data = dask.array.from_delayed(dask.delayed(read_data)(fname), shape, dtype) + return DataArray(data, coords) + + +def read_header(fname): + with TdmsReader(fname) as tdms: + props = tdms.get_properties() + shape = tdms.channel_length, tdms.fileinfo["n_channels"] + dtype = tdms._data_type + t0 = np.datetime64(props["GPSTimeStamp"]) + dt = np.timedelta64(round(1e9 / props["SamplingFrequency[Hz]"]), "ns") + time = { + "tie_indices": [0, shape[0] - 1], + "tie_values": [t0, t0 + dt * (shape[0] - 1)], + } + distance = { + "tie_indices": [0, shape[1] - 1], + "tie_values": [props["Start Distance (m)"], props["Stop Distance (m)"]], + } + coords = {"time": time, "distance": distance} + return shape, dtype, coords + + +def read_data(fname): + with TdmsReader(fname) as tdms: + data = tdms.get_data() + return data diff --git a/xdas/io/tdms.py b/xdas/io/tdms.py new file mode 100644 index 0000000..498e88c --- /dev/null +++ b/xdas/io/tdms.py @@ -0,0 +1,555 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018 Silixa Ltd +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation +files (the "Software"), to use the Software for the sole purpose of private, non-commercial use and/or in-house company +research and development meaning the right to use, copy, modify, merge, share the Software, and to permit persons to whom +the Software is furnished to like-wise do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +For any intended commercial use then contact the copyright holder, Silixa Ltd, for permission, which shall not be unreasonably +withheld. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +$Rev:: 26283 $ +$Date:: 2018-03-27 13:14:05 +0100 (Tue, 27 Mar 2018) $ + +Ref: [1] TDMS_Adv_Read.m + [2] http://www.ni.com/white-paper/5696/en#toc2 + +""" + +import datetime +import mmap +import os +import struct + +import numpy as np +import pandas as pd + + +def load_property_map(xls_file): + prop_map = pd.read_excel(xls_file, sheetname="Sheet1") + return ( + prop_map[["CurrentTag", "CorrectTag"]] + .applymap(lambda x: x.replace(" ", "")) + .set_index("CurrentTag") + .to_dict()["CorrectTag"] + ) + + +# prop_map = load_property_map('MetaDataTable_iDAS_TDMS_CFG_Tags.xlsx') + + +def write_property_dict(prop_dict, out_file): + from pprint import pformat + + f = open(out_file, "w") + f.write("tdms_property_map=") + f.write(pformat(prop_dict)) + f.close() + + +def type_not_supported(vargin): + """Function raises a NotImplementedException.""" + raise NotImplementedError("Reading of this tdsDataType is not implemented") + + +def parse_time_stamp(fractions, seconds): + """ + Convert time TDMS time representation to datetime + fractions -- fractional seconds (2^-64) + seconds -- The number of seconds since 1/1/1904 + @rtype : datetime.datetime + """ + if fractions is not None and seconds is not None and fractions + seconds > 0: + return datetime.timedelta( + 0, fractions * 2**-64 + seconds + ) + datetime.datetime(1904, 1, 1) + else: + return None + + +# Enum mapping TDM data types to description string, numpy type where exists +# See Ref[2] for enum values +TDS_DATA_TYPE = dict( + { + 0x00: "void", # tdsTypeVoid + 0x01: "int8", # tdsTypeI8 + 0x02: "int16", # tdsTypeI16 + 0x03: "int32", # tdsTypeI32 + 0x04: "int64", # tdsTypeI64 + 0x05: "uint8", # tdsTypeU8 + 0x06: "uint16", # tdsTypeU16 + 0x07: "uint32", # tdsTypeU32 + 0x08: "uint64", # tdsTypeU64 + 0x09: "float32", # tdsTypeSingleFloat + 0x0A: "float64", # tdsTypeDoubleFloat + 0x0B: "float128", # tdsTypeExtendedFloat + 0x19: "singleFloatWithUnit", # tdsTypeSingleFloatWithUnit + 0x1A: "doubleFloatWithUnit", # tdsTypeDoubleFloatWithUnit + 0x1B: "extendedFloatWithUnit", # tdsTypeExtendedFloatWithUnit + 0x20: "str", # tdsTypeString + 0x21: "bool", # tdsTypeBoolean + 0x44: "datetime", # tdsTypeTimeStamp + 0xFFFFFFFF: "raw", # tdsTypeDAQmxRawData + } +) + +# Function mapping for reading TDMS data types +TDS_READ_VAL = dict( + { + "void": lambda f: None, # tdsTypeVoid + "int8": lambda f: struct.unpack(" self.file_size: + self.fileinfo["next_segment_offset"] = self.file_size + # raise(ValueError, "Next Segment Offset too large in TDMS header") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._tdms_file.close() + + def _get_channel_length(self): + if not self._channel_length: + self._initialise_data() + + return self._channel_length + + channel_length = property(_get_channel_length) + + def get_properties(self, mapped=False): + """ + Return a dictionary of properties. Read from file only if necessary. + """ + # Check if already hold properties in memory + if self._properties is None: + self._properties = self._read_properties() + if mapped: + raise NotImplementedError("Mapped properties not implemented") + # props = self._properties.copy() + # tmp = [ + # prop_map.get(col.replace(" ", ""), col.replace(" ", "")) + # for col in self._properties.index + # ] + # tmp1 = [] + + # def addToList(ls, val, cnt=0): + # if val not in ls: + # ls.append(val) + # else: + # newVal = val + "_" + str(cnt + 1) + # if newVal not in ls: + # ls.append(newVal) + # else: + # addToList(ls, val, cnt + 1) + + # for col in tmp: + # addToList(tmp1, col) + + # props.index = tmp1 + # return props.loc[:, "Value"].to_dict() + else: + return self._properties.loc[:, "Value"].to_dict() + + def _read_property(self): + """ + Read a single property from the TDMS file. + Return the name, type and value of the property as a list. + """ + # Read length of object path: + var = struct.unpack("= self.fileinfo["n_channels"]: + last_ch = self.fileinfo["n_channels"] + else: + # return data inclusive of last_ch, numpy indexing is exclusive of end index + last_ch += 1 + if last_s is None or last_s > self._channel_length: + last_s = self._channel_length + else: + # return data inclusive of last_s, numpy indexing is exclusive of end index + last_s += 1 + nch = int(max(last_ch - first_ch, 0)) + ns = int(max(last_s - first_s, 0)) + + # Allocate output container + data = np.empty((ns, nch), dtype=np.dtype(self._data_type)) + if data.size == 0: + return data + + ## 1. Index first block & reshape? + first_blk = first_s // self._chunk_size + last_blk = last_s // self._chunk_size + last_full_blk = min(last_blk + 1, self._raw_data.shape[1]) + nchunk = min(max(last_full_blk - first_blk, 0), self._raw_data.shape[1]) + first_s_1a = max(first_s - first_blk * self._chunk_size, 0) + last_s_1a = min( + last_s - first_blk * self._chunk_size, nchunk * self._chunk_size + ) + ind_s = 0 + ind_e = ind_s + max(last_s_1a - first_s_1a, 0) + + # data_1a = self._raw_data[:, first_blk:last_full_blk, + # first_ch:last_ch].reshape((self._chunk_size*nchunk, nch), order='F')[first_s_1a:last_s_1a, :] + d = self._raw_data[:, first_blk:last_full_blk, first_ch:last_ch] + d.shape = (self._chunk_size * nchunk, nch) + d.reshape((self._chunk_size * nchunk, nch), order="F") + data[ind_s:ind_e, :] = d[first_s_1a:last_s_1a, :] + + ## 2. Index first additional samples + first_s_1b = max(first_s - self._raw_data.shape[1] * self._chunk_size, 0) + last_s_1b = min( + last_s - self._raw_data.shape[1] * self._chunk_size, + self._raw_last_chunk.shape[0], + ) + ind_s = ind_e + ind_e = ind_s + max(last_s_1b - first_s_1b, 0) + # data_1b = self._raw_last_chunk[first_s_1b:last_s_1b,first_ch:last_ch] + if ind_e > ind_s: + data[ind_s:ind_e, :] = self._raw_last_chunk[ + first_s_1b:last_s_1b, first_ch:last_ch + ] + + ## 3. Index second block + first_s_2 = max(first_s - self._seg1_length, 0) + last_s_2 = last_s - self._seg1_length + if (first_s_2 > 0 or last_s_2 > 0) and self._raw_data2 is not None: + first_blk_2 = max(first_s_2 // self._chunk_size, 0) + last_blk_2 = max(last_s_2 // self._chunk_size, 0) + last_full_blk_2 = min(last_blk_2 + 1, self._raw_data2.shape[1]) + nchunk_2 = min( + max(last_full_blk_2 - first_blk_2, 0), self._raw_data2.shape[1] + ) + first_s_2a = max(first_s_2 - first_blk_2 * self._chunk_size, 0) + last_s_2a = min( + last_s_2 - first_blk_2 * self._chunk_size, nchunk_2 * self._chunk_size + ) + ind_s = ind_e + ind_e = ind_s + max(last_s_2a - first_s_2a, 0) + # data_2a = self._raw_data2[:, first_blk_2:last_full_blk_2, + # first_ch:last_ch].reshape((self._chunk_size*nchunk_2, nch), order='F')[first_s_2a:last_s_2a, :] + if ind_e > ind_s: + data[ind_s:ind_e, :] = self._raw_data2[ + :, first_blk_2:last_full_blk_2, first_ch:last_ch + ].reshape((self._chunk_size * nchunk_2, nch), order="F")[ + first_s_2a:last_s_2a, : + ] + ## 4. Index second additional samples + if (first_s_2 > 0 or last_s_2 > 0) and self._raw2_last_chunk is not None: + first_s_2b = max(first_s_2 - self._raw_data2.shape[1] * self._chunk_size, 0) + last_s_2b = min( + last_s_2 - self._raw_data2.shape[1] * self._chunk_size, + self._raw2_last_chunk.shape[0], + ) + ind_s = ind_e + ind_e = ind_s + max(last_s_2b - first_s_2b, 0) + # data_2b = self._raw2_last_chunk[first_s_2b:last_s_2b,first_ch:last_ch] + if ind_e > ind_s: + data[ind_s:ind_e, :] = self._raw2_last_chunk[ + first_s_2b:last_s_2b, first_ch:last_ch + ] + ## 5. Concatenate blocks + # data = np.concatenate((data_1a, data_1b, data_2a, data_2b)) + if data.size == 0: + data = data.reshape(0, 0) + return data + + def _initialise_data(self): + """Initialise the memory map for the data array.""" + if self._chunk_size is None: + self._read_chunk_size() + + dmap = mmap.mmap(self._tdms_file.fileno(), 0, access=mmap.ACCESS_READ) + rdo = int(self.fileinfo["raw_data_offset"]) + nch = int(self.fileinfo["n_channels"]) + + # TODO: Support streaming file type? + # TODO: Is this a valid calculation for ChannelLength? + nso = self.fileinfo["next_segment_offset"] + self._seg1_length = int((nso - rdo) / nch / np.dtype(self._data_type).itemsize) + self._channel_length = self._seg1_length + + if self.fileinfo["decimated"]: + n_complete_blk = int(self._seg1_length / self._chunk_size) + ax_ord = "C" + else: + n_complete_blk = 0 + ax_ord = "F" + self._raw_data = np.ndarray( + (n_complete_blk, nch, self._chunk_size), + dtype=self._data_type, + buffer=dmap, + offset=rdo, + ) + # Rotate the axes to [chunk_size, nblk, nch] + self._raw_data = np.rollaxis(self._raw_data, 2) + additional_samples = int(self._seg1_length - n_complete_blk * self._chunk_size) + additional_samples_offset = ( + rdo + + n_complete_blk + * nch + * self._chunk_size + * np.dtype(self._data_type).itemsize + ) + self._raw_last_chunk = np.ndarray( + (nch, additional_samples), + dtype=self._data_type, + buffer=dmap, + offset=additional_samples_offset, + order=ax_ord, + ) + # Rotate the axes to [samples, nch] + self._raw_last_chunk = np.rollaxis(self._raw_last_chunk, 1) + + if self.file_size == nso: + self._seg2_length = 0 + else: + self._tdms_file.seek(nso + 12, 0) + (seg2_nso, seg2_rdo) = struct.unpack("