Skip to content

Adding field-indexing to caterva2 #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions caterva2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,13 @@ def get_download_url(self):
"""
return api_utils.get_download_url(self.path, self.urlbase)

def __getitem__(self, key):
def __getitem__(self, item):
"""
Retrieves a slice of the dataset.

Parameters
----------
key : int, slice, tuple of ints and slices, or None
item : int, slice, tuple of ints and slices, or None
Specifies the slice to fetch.

Returns
Expand All @@ -340,7 +340,17 @@ def __getitem__(self, key):
>>> ds[0:10]
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
"""
return self.slice(key, as_blosc2=False)
if isinstance(item, str): # used a filter or field to index so want blosc2 array as result
fields = np.dtype(eval(self.dtype)).fields
if fields is None:
raise ValueError("The array is not structured (its dtype does not have fields)")
if item in fields:
# A shortcut to access fields
return self.client.get_slice(self.path, as_blosc2=True, field=item) # arg key is None
else: # used a filter (possibly lazyexpr)
return self.client.get_slice(self.path, item, as_blosc2=True)
else:
return self.slice(item, as_blosc2=False)

def slice(
self, key: int | slice | Sequence[slice], as_blosc2: bool = True
Expand Down Expand Up @@ -868,24 +878,25 @@ def fetch(self, path, slice_=None):
[(1.0000500e-02, 1.0100005), (1.0050503e-02, 1.0100505)]],
dtype=[('a', '<f4'), ('b', '<f8')])
"""
urlbase, path = _format_paths(self.urlbase, path)
slice_ = api_utils.slice_to_string(slice_) # convert to string
return api_utils.fetch_data(path, urlbase, {"slice_": slice_}, auth_cookie=self.cookie)
# Does the same as get_slice but forces return of np array
return self.get_slice(path, key=slice_, as_blosc2=False)

def get_slice(self, path, key=None, as_blosc2=True):
def get_slice(self, path, key=None, as_blosc2=True, field=None):
"""Get a slice of a File/Dataset.

Parameters
----------
key : int, slice, or sequence of slices
key : int, slice, sequence of slices or str
The slice to retrieve. If a single slice is provided, it will be
applied to the first dimension. If a sequence of slices is
provided, each slice will be applied to the corresponding
dimension.
dimension. If str, is interpreted as filter.
as_blosc2 : bool
If True (default), the result will be returned as a Blosc2 object
(either a `SChunk` or `NDArray`). If False, it will be returned
as a NumPy array (equivalent to `self[key]`).
field: str
Shortcut to access a field in a structured array. If provided, `key` is ignored.

Returns
-------
Expand All @@ -902,12 +913,20 @@ def get_slice(self, path, key=None, as_blosc2=True):
dtype=[('a', '<f4'), ('b', '<f8')])
"""
urlbase, path = _format_paths(self.urlbase, path)
# Convert slices to strings
slice_ = api_utils.slice_to_string(key)
# Fetch and return the data as a Blosc2 object / NumPy array
return api_utils.fetch_data(
path, urlbase, {"slice_": slice_}, auth_cookie=self.cookie, as_blosc2=as_blosc2
)
if field: # blosc2 doesn't support indexing of multiple fields
return api_utils.fetch_data(
path, urlbase, {"field": field}, auth_cookie=self.cookie, as_blosc2=as_blosc2
)
if isinstance(key, str): # A filter has been passed
return api_utils.fetch_data(
path, urlbase, {"filter": key}, auth_cookie=self.cookie, as_blosc2=as_blosc2
)
else: # Convert slices to strings
slice_ = api_utils.slice_to_string(key)
# Fetch and return the data as a Blosc2 object / NumPy array
return api_utils.fetch_data(
path, urlbase, {"slice_": slice_}, auth_cookie=self.cookie, as_blosc2=as_blosc2
)

def get_chunk(self, path, nchunk):
"""
Expand Down
25 changes: 21 additions & 4 deletions caterva2/services/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tarfile
import typing
import zipfile
from argparse import ArgumentError

# Requirements
import blosc2
Expand Down Expand Up @@ -746,6 +747,8 @@ async def fetch_data(
path: pathlib.Path,
slice_: str | None = None,
user: db.User = Depends(optional_user),
filter: str | None = None,
field: str | None = None,
):
"""
Fetch a dataset.
Expand All @@ -756,6 +759,10 @@ async def fetch_data(
The path to the dataset.
slice_ : str
The slice to fetch.
filter : str
The filter to apply to the dataset.
field : str
The desired field of dataset. If provided, filter is ignored.

Returns
-------
Expand All @@ -771,12 +778,20 @@ async def fetch_data(
abspath, dataprep = abspath_and_dataprep(path, slice_, user=user)
# This is still needed and will only update the necessary chunks
await dataprep()
container = open_b2(abspath, path)

if filter:
if field:
raise ArgumentError("Cannot handle both field and filter parameters at the same time")
filter = filter.strip()
container, _ = get_filtered_array(abspath, path, filter, sortby=None)
else:
container = open_b2(abspath, path)

if isinstance(container, blosc2.Proxy):
container = container._cache
container = container[field] if field else container

if isinstance(container, blosc2.NDArray | blosc2.LazyExpr | hdf5.HDF5Proxy):
if isinstance(container, blosc2.NDArray | blosc2.LazyExpr | hdf5.HDF5Proxy | blosc2.NDField):
array = container
schunk = getattr(array, "schunk", None) # not really needed
typesize = array.dtype.itemsize
Expand All @@ -801,14 +816,14 @@ async def fetch_data(
for sl, sh in zip(slice_, shape, strict=False)
)

if whole and not isinstance(array, blosc2.LazyExpr | hdf5.HDF5Proxy):
if whole and (not isinstance(array, blosc2.LazyExpr | hdf5.HDF5Proxy | blosc2.NDField)) and (not filter):
# Send the data in the file straight to the client,
# avoiding slicing and re-compression.
return FileResponse(abspath, filename=abspath.name, media_type="application/octet-stream")

if isinstance(array, hdf5.HDF5Proxy):
data = array.to_cframe(() if slice_ is None else slice_)
elif isinstance(array, blosc2.LazyExpr):
elif isinstance(array, blosc2.LazyExpr | blosc2.NDField):
data = array[() if slice_ is None else slice_]
data = blosc2.asarray(data)
data = data.to_cframe()
Expand Down Expand Up @@ -1862,13 +1877,15 @@ def get_filtered_array(abspath, path, filter, sortby):

# Filter rows only for NDArray with fields
if filter:
arr = arr._cache if isinstance(arr, blosc2.Proxy) else arr
# Check whether filter is the name of a field
if filter in arr.fields:
if arr.dtype.fields[filter][0] == bool: # noqa: E721
# If boolean, give the filter a boolean expression
filter = f"{filter} == True"
else:
raise IndexError("Filter should be a boolean expression")

# Let's create a LazyExpr with the filter
larr = arr[filter]
# TODO: do some benchmarking to see if this is worth it
Expand Down
18 changes: 18 additions & 0 deletions caterva2/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,24 @@ def test_lazyexpr_getchunk(auth_client):
np.testing.assert_array_equal(out, out_expr)


def test_lazyexpr_fields(auth_client):
if not auth_client:
pytest.skip("authentication support needed")

oppt = f"{TEST_CATERVA2_ROOT}/ds-1d-fields.b2nd"
auth_client.subscribe(TEST_CATERVA2_ROOT)

# Test a field
arr = auth_client.get(oppt)
field = arr["a"]
np.testing.assert_allclose(field[:], arr[:]["a"])

# Test a lazyexpr
servered = arr["(a < 500) & (b >= .1)"][:]
downloaded = arr.slice(None)["(a < 500) & (b >= .1)"][:]
[np.testing.assert_array_equal(servered[f], downloaded[f]) for f in downloaded.dtype.fields]


def test_expr_from_expr(auth_client):
if not auth_client:
pytest.skip("authentication support needed")
Expand Down