From 01e75181ee904282e656ee01180c2d1d3e679239 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 24 Oct 2024 17:48:00 -0400 Subject: [PATCH 01/45] new blank whatsnew --- doc/whats-new.rst | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 9a451a836ad..18fae4e0151 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -14,6 +14,34 @@ What's New np.random.seed(123456) +.. _whats-new.2024.10.1: + +v.2024.10.1 (unreleased) +------------------------ + +New Features +~~~~~~~~~~~~ + + +Breaking changes +~~~~~~~~~~~~~~~~ + + +Deprecations +~~~~~~~~~~~~ + + +Bug fixes +~~~~~~~~~ + + +Documentation +~~~~~~~~~~~~~ + + +Internal Changes +~~~~~~~~~~~~~~~~ + .. _whats-new.2024.10.0: v2024.10.0 (Oct 24th, 2024) From e6b3b3bd777b423435241c67a9482187a86a2256 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Thu, 15 May 2025 17:55:20 -0400 Subject: [PATCH 02/45] test async load using special zarr LatencyStore --- pyproject.toml | 1 + xarray/tests/test_async.py | 106 +++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 xarray/tests/test_async.py diff --git a/pyproject.toml b/pyproject.toml index fa087abbc13..7dc784f170f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,7 @@ dev = [ "pytest-mypy-plugins", "pytest-timeout", "pytest-xdist", + "pytest-asyncio", "ruff>=0.8.0", "sphinx", "sphinx_autosummary_accessors", diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py new file mode 100644 index 00000000000..78a78a95c4c --- /dev/null +++ b/xarray/tests/test_async.py @@ -0,0 +1,106 @@ +from typing import TypeVar, Iterable +import asyncio +import time + +import pytest +import numpy as np + +from xarray.tests import has_zarr_v3, requires_zarr_v3 +import xarray as xr + + +if has_zarr_v3: + import zarr + from zarr.abc.store import Store + from zarr.storage import MemoryStore + from zarr.storage._wrapper import WrapperStore + + from zarr.abc.store import ByteRequest + from zarr.core.buffer import Buffer, BufferPrototype + + T_Store = TypeVar("T_Store", bound=Store) + + + class LatencyStore(WrapperStore[T_Store]): + """Works the same way as the zarr LoggingStore""" + latency: float + + def __init__( + self, + store: T_Store, + latency: float = 0.0, + ) -> None: + """ + Store wrapper that adds artificial latency to each get call. + + Parameters + ---------- + store : Store + Store to wrap + latency : float + Amount of artificial latency to add to each get call, in seconds. + """ + super().__init__(store) + self.latency = latency + + def __str__(self) -> str: + return f"latency-{self._store}" + + def __repr__(self) -> str: + return f"LatencyStore({self._store.__class__.__name__}, '{self._store}', latency={self.latency})" + + async def get( + self, + key: str, + prototype: BufferPrototype, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + await asyncio.sleep(self.latency) + return await self._store.get(key=key, prototype=prototype, byte_range=byte_range) + + async def get_partial_values( + self, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRequest | None]], + ) -> list[Buffer | None]: + await asyncio.sleep(self.latency) + return await self._store.get_partial_values(prototype=prototype, key_ranges=key_ranges) +else: + LatencyStore = {} + + +@pytest.fixture +def memorystore() -> "MemoryStore": + memorystore = zarr.storage.MemoryStore({}) + z = zarr.create_array( + store=memorystore, + name="foo", + shape=(10, 10), + chunks=(5, 5), + dtype="f4", + dimension_names=["x", "y"] + ) + z[:, :] = np.random.random((10, 10)) + + return memorystore + + +@requires_zarr_v3 +@pytest.mark.asyncio +async def test_async_load(memorystore): + N_DATASETS = 3 + LATENCY = 1.0 + + latencystore = LatencyStore(memorystore, latency=LATENCY) + datasets = [xr.open_zarr(latencystore, zarr_format=3, consolidated=False) for _ in range(N_DATASETS)] + + start_time = time.time() + # TODO actually implement the async.load method + #tasks = [ds.async.load() for ds in datasets] + #results = await asyncio.gather(*tasks) + results = [ds.load() for ds in datasets] + total_time = time.time() - start_time + + assert total_time > LATENCY # Cannot possibly be quicker than this + assert total_time < LATENCY * N_DATASETS # If this isn't true we're gaining nothing from async + assert abs(total_time - LATENCY) < 0.5 # Should take approximately LATENCY seconds, but allow some buffer From 3ceab60f79201cfd46c0381731d1f188eda816cf Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 16 May 2025 11:53:32 -0400 Subject: [PATCH 03/45] don't use dask --- xarray/tests/test_async.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 78a78a95c4c..c28680cd1dd 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -88,19 +88,22 @@ def memorystore() -> "MemoryStore": @requires_zarr_v3 @pytest.mark.asyncio async def test_async_load(memorystore): - N_DATASETS = 3 + N_DATASETS = 10 LATENCY = 1.0 latencystore = LatencyStore(memorystore, latency=LATENCY) - datasets = [xr.open_zarr(latencystore, zarr_format=3, consolidated=False) for _ in range(N_DATASETS)] + datasets = [xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) for _ in range(N_DATASETS)] + # TODO add async load to Dataset and DataArray as well as to Variable start_time = time.time() - # TODO actually implement the async.load method - #tasks = [ds.async.load() for ds in datasets] - #results = await asyncio.gather(*tasks) - results = [ds.load() for ds in datasets] + tasks = [ds['foo'].variable.async_load() for ds in datasets] + results = await asyncio.gather(*tasks) + #results = [ds['foo'].variable.load() for ds in datasets] total_time = time.time() - start_time assert total_time > LATENCY # Cannot possibly be quicker than this assert total_time < LATENCY * N_DATASETS # If this isn't true we're gaining nothing from async assert abs(total_time - LATENCY) < 0.5 # Should take approximately LATENCY seconds, but allow some buffer + + print(total_time) + assert False \ No newline at end of file From 071c35a19914b2974523e46fa43598b97b7d5777 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 16 May 2025 11:53:44 -0400 Subject: [PATCH 04/45] async all the way down --- xarray/backends/common.py | 6 +++++ xarray/backends/zarr.py | 22 +++++++++++++++++ xarray/core/indexing.py | 46 +++++++++++++++++++++++++++++++++++ xarray/core/variable.py | 6 +++++ xarray/namedarray/pycompat.py | 23 ++++++++++++++++++ 5 files changed, 103 insertions(+) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 58a98598a5b..c31a3caaf81 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -273,6 +273,12 @@ class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): def get_duck_array(self, dtype: np.typing.DTypeLike = None): key = indexing.BasicIndexer((slice(None),) * self.ndim) return self[key] # type: ignore[index] + + async def async_get_duck_array(self, dtype: np.typing.DTypeLike = None): + key = indexing.BasicIndexer((slice(None),) * self.ndim) + # TODO use zarr-python async get method here? + print("async inside BackendArray") + return await self.getitem(key) # type: ignore[index] class AbstractDataStore: diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 1a46346dda7..dbbf93d125f 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -185,6 +185,8 @@ class ZarrArrayWrapper(BackendArray): def __init__(self, zarr_array): # some callers attempt to evaluate an array if an `array` property exists on the object. # we prefix with _ to avoid this inference. + + # TODO type hint this? self._array = zarr_array self.shape = self._array.shape @@ -211,6 +213,10 @@ def _vindex(self, key): def _getitem(self, key): return self._array[key] + + async def _async_getitem(self, key): + async_array = self._array._async_array + return await async_array.getitem(key) def __getitem__(self, key): array = self._array @@ -227,6 +233,22 @@ def __getitem__(self, key): # if self.ndim == 0: # could possibly have a work-around for 0d data here + async def async_getitem(self, key): + # this doesn't need to be async + array = self._array + if isinstance(key, indexing.BasicIndexer): + method = self._async_getitem + elif isinstance(key, indexing.VectorizedIndexer): + # TODO + method = self._vindex + elif isinstance(key, indexing.OuterIndexer): + # TODO + method = self._oindex + + print("did an async get") + return await indexing.async_explicit_indexing_adapter( + key, array.shape, indexing.IndexingSupport.VECTORIZED, method + ) def _determine_zarr_chunks( enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode, shape diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index c1b847202c7..2adea07e96c 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -523,6 +523,10 @@ def get_duck_array(self): key = BasicIndexer((slice(None),) * self.ndim) return self[key] + async def async_get_duck_array(self): + key = BasicIndexer((slice(None),) * self.ndim) + return self[key] + def _oindex_get(self, indexer: OuterIndexer): raise NotImplementedError( f"{self.__class__.__name__}._oindex_get method should be overridden" @@ -661,6 +665,22 @@ def get_duck_array(self): array = array.get_duck_array() return _wrap_numpy_scalars(array) + async def async_get_duck_array(self): + if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): + array = apply_indexer(self.array, self.key) + else: + # If the array is not an ExplicitlyIndexedNDArrayMixin, + # it may wrap a BackendArray so use its (async) getitem + array = await self.array.async_getitem(self.key) + + # self.array[self.key] is now a numpy array when + # self.array is a BackendArray subclass + # and self.key is BasicIndexer((slice(None, None, None),)) + # so we need the explicit check for ExplicitlyIndexed + if isinstance(array, ExplicitlyIndexed): + array = await array.async_get_duck_array() + return _wrap_numpy_scalars(array) + def transpose(self, order): return LazilyVectorizedIndexedArray(self.array, self.key).transpose(order) @@ -797,6 +817,9 @@ def _ensure_copied(self): def get_duck_array(self): return self.array.get_duck_array() + async def async_get_duck_array(self): + return await self.array.async_get_duck_array() + def _oindex_get(self, indexer: OuterIndexer): return type(self)(_wrap_numpy_scalars(self.array.oindex[indexer])) @@ -839,10 +862,18 @@ def __init__(self, array): def _ensure_cached(self): self.array = as_indexable(self.array.get_duck_array()) + + async def _async_ensure_cached(self): + duck_array = await self.array.async_get_duck_array() + self.array = as_indexable(duck_array) def get_duck_array(self): self._ensure_cached() return self.array.get_duck_array() + + async def async_get_duck_array(self): + await self._async_ensure_cached() + return await self.array.async_get_duck_array() def _oindex_get(self, indexer: OuterIndexer): return type(self)(_wrap_numpy_scalars(self.array.oindex[indexer])) @@ -1027,6 +1058,21 @@ def explicit_indexing_adapter( return result +async def async_explicit_indexing_adapter( + key: ExplicitIndexer, + shape: _Shape, + indexing_support: IndexingSupport, + raw_indexing_method: Callable[..., Any], +) -> Any: + raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support) + result = await raw_indexing_method(raw_key.tuple) + if numpy_indices.tuple: + # index the loaded duck array + indexable = as_indexable(result) + result = apply_indexer(indexable, numpy_indices) + return result + + def apply_indexer(indexable, indexer: ExplicitIndexer): """Apply an indexer to an indexable object.""" if isinstance(indexer, VectorizedIndexer): diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 4e58b0d4b20..9b184d1069e 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -51,6 +51,7 @@ is_0d_dask_array, is_chunked_array, to_duck_array, + async_to_duck_array, ) from xarray.namedarray.utils import module_available from xarray.util.deprecation_helpers import _deprecate_positional_args, deprecate_dims @@ -956,6 +957,11 @@ def load(self, **kwargs): """ self._data = to_duck_array(self._data, **kwargs) return self + + async def async_load(self, **kwargs): + print("async inside Variable") + self._data = await async_to_duck_array(self._data, **kwargs) + return self def compute(self, **kwargs): """Manually trigger loading of this variable's data from disk or a diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index 68b6a7853bf..527b83fed15 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -145,3 +145,26 @@ def to_duck_array(data: Any, **kwargs: dict[str, Any]) -> duckarray[_ShapeType, return data else: return np.asarray(data) # type: ignore[return-value] + + +async def async_to_duck_array(data: Any, **kwargs: dict[str, Any]) -> duckarray[_ShapeType, _DType]: + from xarray.core.indexing import ( + ExplicitlyIndexed, + ImplicitToExplicitIndexingAdapter, + ) + from xarray.namedarray.parallelcompat import get_chunked_array_type + + print(type(data)) + + if is_chunked_array(data): + chunkmanager = get_chunked_array_type(data) + loaded_data, *_ = chunkmanager.compute(data, **kwargs) # type: ignore[var-annotated] + return loaded_data + + if isinstance(data, ExplicitlyIndexed | ImplicitToExplicitIndexingAdapter): + print("async inside to_duck_array") + return await data.async_get_duck_array() # type: ignore[no-untyped-call, no-any-return] + elif is_duck_array(data): + return data + else: + return np.asarray(data) # type: ignore[return-value] From 29374f9e4f20056690c9a8ac6330f1a00ecaba59 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 16 May 2025 12:02:09 -0400 Subject: [PATCH 05/45] remove assert False --- xarray/tests/test_async.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index c28680cd1dd..05422beea1f 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -88,22 +88,19 @@ def memorystore() -> "MemoryStore": @requires_zarr_v3 @pytest.mark.asyncio async def test_async_load(memorystore): - N_DATASETS = 10 + N_LOADS= 10 LATENCY = 1.0 latencystore = LatencyStore(memorystore, latency=LATENCY) - datasets = [xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) for _ in range(N_DATASETS)] + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) # TODO add async load to Dataset and DataArray as well as to Variable + # TODO change the syntax to `.async.load()`? start_time = time.time() - tasks = [ds['foo'].variable.async_load() for ds in datasets] + tasks = [ds['foo'].variable.async_load() for _ in range(N_LOADS)] results = await asyncio.gather(*tasks) - #results = [ds['foo'].variable.load() for ds in datasets] total_time = time.time() - start_time assert total_time > LATENCY # Cannot possibly be quicker than this - assert total_time < LATENCY * N_DATASETS # If this isn't true we're gaining nothing from async + assert total_time < LATENCY * N_LOADS # If this isn't true we're gaining nothing from async assert abs(total_time - LATENCY) < 0.5 # Should take approximately LATENCY seconds, but allow some buffer - - print(total_time) - assert False \ No newline at end of file From ab12bb8d01cabd910ac909aad4509b1ee70dfa4c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 May 2025 16:07:10 +0000 Subject: [PATCH 06/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/backends/common.py | 2 +- xarray/backends/zarr.py | 5 ++-- xarray/core/indexing.py | 4 +-- xarray/core/variable.py | 4 +-- xarray/namedarray/pycompat.py | 6 +++-- xarray/tests/test_async.py | 46 ++++++++++++++++++++--------------- 6 files changed, 38 insertions(+), 29 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index c31a3caaf81..edda5cff429 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -273,7 +273,7 @@ class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): def get_duck_array(self, dtype: np.typing.DTypeLike = None): key = indexing.BasicIndexer((slice(None),) * self.ndim) return self[key] # type: ignore[index] - + async def async_get_duck_array(self, dtype: np.typing.DTypeLike = None): key = indexing.BasicIndexer((slice(None),) * self.ndim) # TODO use zarr-python async get method here? diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index dbbf93d125f..f068826eef2 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -185,7 +185,7 @@ class ZarrArrayWrapper(BackendArray): def __init__(self, zarr_array): # some callers attempt to evaluate an array if an `array` property exists on the object. # we prefix with _ to avoid this inference. - + # TODO type hint this? self._array = zarr_array self.shape = self._array.shape @@ -213,7 +213,7 @@ def _vindex(self, key): def _getitem(self, key): return self._array[key] - + async def _async_getitem(self, key): async_array = self._array._async_array return await async_array.getitem(key) @@ -250,6 +250,7 @@ async def async_getitem(self, key): key, array.shape, indexing.IndexingSupport.VECTORIZED, method ) + def _determine_zarr_chunks( enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode, shape ): diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 2adea07e96c..53f9bd12088 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -862,7 +862,7 @@ def __init__(self, array): def _ensure_cached(self): self.array = as_indexable(self.array.get_duck_array()) - + async def _async_ensure_cached(self): duck_array = await self.array.async_get_duck_array() self.array = as_indexable(duck_array) @@ -870,7 +870,7 @@ async def _async_ensure_cached(self): def get_duck_array(self): self._ensure_cached() return self.array.get_duck_array() - + async def async_get_duck_array(self): await self._async_ensure_cached() return await self.array.async_get_duck_array() diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 9b184d1069e..cd3af386b95 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -47,11 +47,11 @@ from xarray.namedarray.core import NamedArray, _raise_if_any_duplicate_dimensions from xarray.namedarray.parallelcompat import get_chunked_array_type from xarray.namedarray.pycompat import ( + async_to_duck_array, integer_types, is_0d_dask_array, is_chunked_array, to_duck_array, - async_to_duck_array, ) from xarray.namedarray.utils import module_available from xarray.util.deprecation_helpers import _deprecate_positional_args, deprecate_dims @@ -957,7 +957,7 @@ def load(self, **kwargs): """ self._data = to_duck_array(self._data, **kwargs) return self - + async def async_load(self, **kwargs): print("async inside Variable") self._data = await async_to_duck_array(self._data, **kwargs) diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index 527b83fed15..c6a07e5963f 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -145,9 +145,11 @@ def to_duck_array(data: Any, **kwargs: dict[str, Any]) -> duckarray[_ShapeType, return data else: return np.asarray(data) # type: ignore[return-value] - -async def async_to_duck_array(data: Any, **kwargs: dict[str, Any]) -> duckarray[_ShapeType, _DType]: + +async def async_to_duck_array( + data: Any, **kwargs: dict[str, Any] +) -> duckarray[_ShapeType, _DType]: from xarray.core.indexing import ( ExplicitlyIndexed, ImplicitToExplicitIndexingAdapter, diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 05422beea1f..8523c41662a 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -1,28 +1,26 @@ -from typing import TypeVar, Iterable import asyncio import time +from collections.abc import Iterable +from typing import TypeVar -import pytest import numpy as np +import pytest -from xarray.tests import has_zarr_v3, requires_zarr_v3 import xarray as xr - +from xarray.tests import has_zarr_v3, requires_zarr_v3 if has_zarr_v3: import zarr - from zarr.abc.store import Store + from zarr.abc.store import ByteRequest, Store + from zarr.core.buffer import Buffer, BufferPrototype from zarr.storage import MemoryStore from zarr.storage._wrapper import WrapperStore - from zarr.abc.store import ByteRequest - from zarr.core.buffer import Buffer, BufferPrototype - T_Store = TypeVar("T_Store", bound=Store) - class LatencyStore(WrapperStore[T_Store]): """Works the same way as the zarr LoggingStore""" + latency: float def __init__( @@ -42,7 +40,7 @@ def __init__( """ super().__init__(store) self.latency = latency - + def __str__(self) -> str: return f"latency-{self._store}" @@ -56,15 +54,19 @@ async def get( byte_range: ByteRequest | None = None, ) -> Buffer | None: await asyncio.sleep(self.latency) - return await self._store.get(key=key, prototype=prototype, byte_range=byte_range) - + return await self._store.get( + key=key, prototype=prototype, byte_range=byte_range + ) + async def get_partial_values( self, prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: await asyncio.sleep(self.latency) - return await self._store.get_partial_values(prototype=prototype, key_ranges=key_ranges) + return await self._store.get_partial_values( + prototype=prototype, key_ranges=key_ranges + ) else: LatencyStore = {} @@ -76,9 +78,9 @@ def memorystore() -> "MemoryStore": store=memorystore, name="foo", shape=(10, 10), - chunks=(5, 5), + chunks=(5, 5), dtype="f4", - dimension_names=["x", "y"] + dimension_names=["x", "y"], ) z[:, :] = np.random.random((10, 10)) @@ -88,7 +90,7 @@ def memorystore() -> "MemoryStore": @requires_zarr_v3 @pytest.mark.asyncio async def test_async_load(memorystore): - N_LOADS= 10 + N_LOADS = 10 LATENCY = 1.0 latencystore = LatencyStore(memorystore, latency=LATENCY) @@ -97,10 +99,14 @@ async def test_async_load(memorystore): # TODO add async load to Dataset and DataArray as well as to Variable # TODO change the syntax to `.async.load()`? start_time = time.time() - tasks = [ds['foo'].variable.async_load() for _ in range(N_LOADS)] + tasks = [ds["foo"].variable.async_load() for _ in range(N_LOADS)] results = await asyncio.gather(*tasks) total_time = time.time() - start_time - + assert total_time > LATENCY # Cannot possibly be quicker than this - assert total_time < LATENCY * N_LOADS # If this isn't true we're gaining nothing from async - assert abs(total_time - LATENCY) < 0.5 # Should take approximately LATENCY seconds, but allow some buffer + assert ( + total_time < LATENCY * N_LOADS + ) # If this isn't true we're gaining nothing from async + assert ( + abs(total_time - LATENCY) < 0.5 + ) # Should take approximately LATENCY seconds, but allow some buffer From 62aa39dd81a64005891dea403eb7f778b2b67669 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 16 May 2025 14:30:46 -0400 Subject: [PATCH 07/45] add pytest-asyncio to CI envs --- ci/minimum_versions.py | 1 + ci/requirements/all-but-dask.yml | 1 + ci/requirements/all-but-numba.yml | 1 + ci/requirements/bare-minimum.yml | 1 + ci/requirements/environment-3.14.yml | 1 + ci/requirements/environment-windows-3.14.yml | 1 + ci/requirements/environment-windows.yml | 1 + ci/requirements/environment.yml | 1 + ci/requirements/min-all-deps.yml | 1 + 9 files changed, 9 insertions(+) diff --git a/ci/minimum_versions.py b/ci/minimum_versions.py index 08808d002d9..c4816c39a74 100644 --- a/ci/minimum_versions.py +++ b/ci/minimum_versions.py @@ -30,6 +30,7 @@ "coveralls", "pip", "pytest", + "pytest-asyncio" "pytest-cov", "pytest-env", "pytest-mypy-plugins", diff --git a/ci/requirements/all-but-dask.yml b/ci/requirements/all-but-dask.yml index ca4943bddb1..987adc7dfdd 100644 --- a/ci/requirements/all-but-dask.yml +++ b/ci/requirements/all-but-dask.yml @@ -28,6 +28,7 @@ dependencies: - pip - pydap - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/all-but-numba.yml b/ci/requirements/all-but-numba.yml index fa7ad81f198..1d49f92133c 100644 --- a/ci/requirements/all-but-numba.yml +++ b/ci/requirements/all-but-numba.yml @@ -41,6 +41,7 @@ dependencies: - pyarrow # pandas raises a deprecation warning without this, breaking doctests - pydap - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/bare-minimum.yml b/ci/requirements/bare-minimum.yml index 02e99d34af2..cc34a6e4824 100644 --- a/ci/requirements/bare-minimum.yml +++ b/ci/requirements/bare-minimum.yml @@ -7,6 +7,7 @@ dependencies: - coveralls - pip - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/environment-3.14.yml b/ci/requirements/environment-3.14.yml index 1e6ee7ff5f9..bfbeababa56 100644 --- a/ci/requirements/environment-3.14.yml +++ b/ci/requirements/environment-3.14.yml @@ -37,6 +37,7 @@ dependencies: - pyarrow # pandas raises a deprecation warning without this, breaking doctests - pydap - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/environment-windows-3.14.yml b/ci/requirements/environment-windows-3.14.yml index 4eb2049f2e6..d5143470614 100644 --- a/ci/requirements/environment-windows-3.14.yml +++ b/ci/requirements/environment-windows-3.14.yml @@ -32,6 +32,7 @@ dependencies: - pyarrow # importing dask.dataframe raises an ImportError without this - pydap - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/environment-windows.yml b/ci/requirements/environment-windows.yml index 45cbebd38db..6aeca2cb0ab 100644 --- a/ci/requirements/environment-windows.yml +++ b/ci/requirements/environment-windows.yml @@ -32,6 +32,7 @@ dependencies: - pyarrow # importing dask.dataframe raises an ImportError without this - pydap - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/environment.yml b/ci/requirements/environment.yml index b4354b14f40..9c253d5d489 100644 --- a/ci/requirements/environment.yml +++ b/ci/requirements/environment.yml @@ -38,6 +38,7 @@ dependencies: - pydap - pydap-server - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins diff --git a/ci/requirements/min-all-deps.yml b/ci/requirements/min-all-deps.yml index 03e14773d53..1293f4d78d6 100644 --- a/ci/requirements/min-all-deps.yml +++ b/ci/requirements/min-all-deps.yml @@ -44,6 +44,7 @@ dependencies: - pip - pydap=3.5 - pytest + - pytest-asyncio - pytest-cov - pytest-env - pytest-mypy-plugins From a906decee5e8d08d605d671f373d59a233745917 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 May 2025 18:31:20 +0000 Subject: [PATCH 08/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ci/minimum_versions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ci/minimum_versions.py b/ci/minimum_versions.py index c4816c39a74..4cc0b76916b 100644 --- a/ci/minimum_versions.py +++ b/ci/minimum_versions.py @@ -30,8 +30,7 @@ "coveralls", "pip", "pytest", - "pytest-asyncio" - "pytest-cov", + "pytest-asynciopytest-cov", "pytest-env", "pytest-mypy-plugins", "pytest-timeout", From 629ab31e2b13f51cd74acc0e4174959c8a85f3c1 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 16 May 2025 16:15:09 -0400 Subject: [PATCH 09/45] assert results are identical --- xarray/tests/test_async.py | 57 ++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 8523c41662a..4a03ed738e3 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -7,6 +7,7 @@ import pytest import xarray as xr +import xarray.testing as xrt from xarray.tests import has_zarr_v3, requires_zarr_v3 if has_zarr_v3: @@ -84,29 +85,43 @@ def memorystore() -> "MemoryStore": ) z[:, :] = np.random.random((10, 10)) + z = zarr.create_array( + store=memorystore, + name="bar", + shape=(10,), + chunks=(5), + dtype="f4", + dimension_names=["x"], + ) + z[:] = np.random.random((10,)) + return memorystore @requires_zarr_v3 @pytest.mark.asyncio -async def test_async_load(memorystore): - N_LOADS = 10 - LATENCY = 1.0 - - latencystore = LatencyStore(memorystore, latency=LATENCY) - ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - - # TODO add async load to Dataset and DataArray as well as to Variable - # TODO change the syntax to `.async.load()`? - start_time = time.time() - tasks = [ds["foo"].variable.async_load() for _ in range(N_LOADS)] - results = await asyncio.gather(*tasks) - total_time = time.time() - start_time - - assert total_time > LATENCY # Cannot possibly be quicker than this - assert ( - total_time < LATENCY * N_LOADS - ) # If this isn't true we're gaining nothing from async - assert ( - abs(total_time - LATENCY) < 0.5 - ) # Should take approximately LATENCY seconds, but allow some buffer +class TestAsyncLoad: + async def test_async_load_variable(self, memorystore): + N_LOADS = 5 + LATENCY = 1.0 + + latencystore = LatencyStore(memorystore, latency=LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + # TODO add async load to Dataset and DataArray as well as to Variable + # TODO change the syntax to `.async.load()`? + start_time = time.time() + tasks = [ds["foo"].variable.async_load() for _ in range(N_LOADS)] + results = await asyncio.gather(*tasks) + total_time = time.time() - start_time + + for result in results: + xrt.assert_identical(result, ds["foo"].variable.load()) + + assert total_time > LATENCY # Cannot possibly be quicker than this + assert ( + total_time < LATENCY * N_LOADS + ) # If this isn't true we're gaining nothing from async + assert ( + abs(total_time - LATENCY) < 0.5 + ) # Should take approximately LATENCY seconds, but allow some buffer From 7e9ae0fa20a736a3d0baf8c0c6a8e10c263fdf2d Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sat, 17 May 2025 22:41:56 +0300 Subject: [PATCH 10/45] implement async load for dataarray and dataset --- xarray/core/dataarray.py | 8 ++++++ xarray/core/dataset.py | 25 ++++++++++++++++ xarray/tests/test_async.py | 59 ++++++++++++++++++++++++++++++++------ 3 files changed, 84 insertions(+), 8 deletions(-) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 1e7e1069076..808f39d8c03 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -1160,6 +1160,14 @@ def load(self, **kwargs) -> Self: self._coords = new._coords return self + async def async_load(self, **kwargs) -> Self: + temp_ds = self._to_temp_dataset() + ds = await temp_ds.async_load(**kwargs) + new = self._from_temp_dataset(ds) + self._variable = new._variable + self._coords = new._coords + return self + def compute(self, **kwargs) -> Self: """Manually trigger loading of this array's data from disk or a remote source into memory and return a new array. diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 5a7f757ba8a..33e37e1ca4c 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -552,6 +552,31 @@ def load(self, **kwargs) -> Self: return self + async def async_load(self, **kwargs) -> Self: + # this blocks on chunked arrays but not on lazily indexed arrays + + # access .data to coerce everything to numpy or dask arrays + lazy_data = { + k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) + } + if lazy_data: + chunkmanager = get_chunked_array_type(*lazy_data.values()) + + # evaluate all the chunked arrays simultaneously + evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( + *lazy_data.values(), **kwargs + ) + + for k, data in zip(lazy_data, evaluated_data, strict=False): + self.variables[k].data = data + + # load everything else sequentially + for k, v in self.variables.items(): + if k not in lazy_data: + await v.async_load() + + return self + def __dask_tokenize__(self) -> object: from dask.base import normalize_token diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 4a03ed738e3..37491619af1 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -101,27 +101,70 @@ def memorystore() -> "MemoryStore": @requires_zarr_v3 @pytest.mark.asyncio class TestAsyncLoad: - async def test_async_load_variable(self, memorystore): - N_LOADS = 5 - LATENCY = 1.0 + N_LOADS = 10 + LATENCY = 1.0 - latencystore = LatencyStore(memorystore, latency=LATENCY) + # TODO refactor these tests + async def test_async_load_variable(self, memorystore): + latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) # TODO add async load to Dataset and DataArray as well as to Variable # TODO change the syntax to `.async.load()`? start_time = time.time() - tasks = [ds["foo"].variable.async_load() for _ in range(N_LOADS)] + tasks = [ds["foo"].variable.async_load() for _ in range(self.N_LOADS)] results = await asyncio.gather(*tasks) total_time = time.time() - start_time for result in results: xrt.assert_identical(result, ds["foo"].variable.load()) - assert total_time > LATENCY # Cannot possibly be quicker than this + assert total_time > self.LATENCY # Cannot possibly be quicker than this + assert ( + total_time < self.LATENCY * self.N_LOADS + ) # If this isn't true we're gaining nothing from async + assert ( + abs(total_time - self.LATENCY) < 0.5 + ) # Should take approximately LATENCY seconds, but allow some buffer + + async def test_async_load_dataarray(self, memorystore): + latencystore = LatencyStore(memorystore, latency=self.LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + # TODO change the syntax to `.async.load()`? + start_time = time.time() + tasks = [ds["foo"].async_load() for _ in range(self.N_LOADS)] + results = await asyncio.gather(*tasks) + total_time = time.time() - start_time + + for result in results: + xrt.assert_identical(result, ds["foo"].load()) + + assert total_time > self.LATENCY # Cannot possibly be quicker than this + assert ( + total_time < self.LATENCY * self.N_LOADS + ) # If this isn't true we're gaining nothing from async + assert ( + abs(total_time - self.LATENCY) < 0.5 + ) # Should take approximately LATENCY seconds, but allow some buffer + + async def test_async_load_dataset(self, memorystore): + latencystore = LatencyStore(memorystore, latency=self.LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + # TODO change the syntax to `.async.load()`? + start_time = time.time() + tasks = [ds.async_load() for _ in range(self.N_LOADS)] + results = await asyncio.gather(*tasks) + total_time = time.time() - start_time + + for result in results: + xrt.assert_identical(result, ds.load()) + + assert total_time > self.LATENCY # Cannot possibly be quicker than this assert ( - total_time < LATENCY * N_LOADS + total_time < self.LATENCY * self.N_LOADS ) # If this isn't true we're gaining nothing from async assert ( - abs(total_time - LATENCY) < 0.5 + abs(total_time - self.LATENCY) < 0.5 ) # Should take approximately LATENCY seconds, but allow some buffer From d288351df45300a260e17c7de3f710687510e8cd Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sat, 17 May 2025 23:27:07 +0300 Subject: [PATCH 11/45] factor out common logic --- xarray/tests/test_async.py | 79 ++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 37491619af1..9e9bbefe56b 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -1,6 +1,7 @@ import asyncio import time from collections.abc import Iterable +from contextlib import asynccontextmanager from typing import TypeVar import numpy as np @@ -98,73 +99,77 @@ def memorystore() -> "MemoryStore": return memorystore +class AsyncTimer: + """Context manager for timing async operations and making assertions about their execution time.""" + + start_time: float + end_time: float + total_time: float + + @asynccontextmanager + async def measure(self): + """Measure the execution time of the async code within this context.""" + self.start_time = time.time() + try: + yield self + finally: + self.end_time = time.time() + self.total_time = self.end_time - self.start_time + + @requires_zarr_v3 @pytest.mark.asyncio class TestAsyncLoad: - N_LOADS = 10 - LATENCY = 1.0 + N_LOADS: int = 5 + LATENCY: float = 1.0 + + def assert_time_as_expected(self, total_time: float) -> None: + assert total_time > self.LATENCY # Cannot possibly be quicker than this + assert ( + total_time < self.LATENCY * self.N_LOADS + ) # If this isn't true we're gaining nothing from async + assert ( + abs(total_time - self.LATENCY) < 0.5 + ) # Should take approximately LATENCY seconds, but allow some buffer - # TODO refactor these tests async def test_async_load_variable(self, memorystore): latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - # TODO add async load to Dataset and DataArray as well as to Variable # TODO change the syntax to `.async.load()`? - start_time = time.time() - tasks = [ds["foo"].variable.async_load() for _ in range(self.N_LOADS)] - results = await asyncio.gather(*tasks) - total_time = time.time() - start_time + async with AsyncTimer().measure() as timer: + tasks = [ds["foo"].variable.async_load() for _ in range(self.N_LOADS)] + results = await asyncio.gather(*tasks) for result in results: xrt.assert_identical(result, ds["foo"].variable.load()) - assert total_time > self.LATENCY # Cannot possibly be quicker than this - assert ( - total_time < self.LATENCY * self.N_LOADS - ) # If this isn't true we're gaining nothing from async - assert ( - abs(total_time - self.LATENCY) < 0.5 - ) # Should take approximately LATENCY seconds, but allow some buffer + self.assert_time_as_expected(timer.total_time) async def test_async_load_dataarray(self, memorystore): latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) # TODO change the syntax to `.async.load()`? - start_time = time.time() - tasks = [ds["foo"].async_load() for _ in range(self.N_LOADS)] - results = await asyncio.gather(*tasks) - total_time = time.time() - start_time + async with AsyncTimer().measure() as timer: + tasks = [ds["foo"].async_load() for _ in range(self.N_LOADS)] + results = await asyncio.gather(*tasks) for result in results: xrt.assert_identical(result, ds["foo"].load()) - assert total_time > self.LATENCY # Cannot possibly be quicker than this - assert ( - total_time < self.LATENCY * self.N_LOADS - ) # If this isn't true we're gaining nothing from async - assert ( - abs(total_time - self.LATENCY) < 0.5 - ) # Should take approximately LATENCY seconds, but allow some buffer + self.assert_time_as_expected(timer.total_time) async def test_async_load_dataset(self, memorystore): latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) # TODO change the syntax to `.async.load()`? - start_time = time.time() - tasks = [ds.async_load() for _ in range(self.N_LOADS)] - results = await asyncio.gather(*tasks) - total_time = time.time() - start_time + async with AsyncTimer().measure() as timer: + tasks = [ds.async_load() for _ in range(self.N_LOADS)] + results = await asyncio.gather(*tasks) for result in results: xrt.assert_identical(result, ds.load()) - assert total_time > self.LATENCY # Cannot possibly be quicker than this - assert ( - total_time < self.LATENCY * self.N_LOADS - ) # If this isn't true we're gaining nothing from async - assert ( - abs(total_time - self.LATENCY) < 0.5 - ) # Should take approximately LATENCY seconds, but allow some buffer + self.assert_time_as_expected(timer.total_time) From e0731a08c563b04a01c56ff288ae7eea6e5d7a4b Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sun, 18 May 2025 00:01:29 +0300 Subject: [PATCH 12/45] consolidate tests via a parametrized fixture --- xarray/tests/test_async.py | 52 +++++++++++++------------------------- 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 9e9bbefe56b..109deba78d3 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -123,53 +123,35 @@ class TestAsyncLoad: N_LOADS: int = 5 LATENCY: float = 1.0 + @pytest.fixture(params=["ds", "da", "var"]) + def xr_obj(self, request, memorystore) -> xr.Dataset | xr.DataArray | xr.Variable: + latencystore = LatencyStore(memorystore, latency=self.LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + match request.param: + case "var": + return ds["foo"].variable + case "da": + return ds["foo"] + case "ds": + return ds + def assert_time_as_expected(self, total_time: float) -> None: assert total_time > self.LATENCY # Cannot possibly be quicker than this assert ( total_time < self.LATENCY * self.N_LOADS ) # If this isn't true we're gaining nothing from async assert ( - abs(total_time - self.LATENCY) < 0.5 + abs(total_time - self.LATENCY) < 2.0 ) # Should take approximately LATENCY seconds, but allow some buffer - async def test_async_load_variable(self, memorystore): - latencystore = LatencyStore(memorystore, latency=self.LATENCY) - ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - - # TODO change the syntax to `.async.load()`? - async with AsyncTimer().measure() as timer: - tasks = [ds["foo"].variable.async_load() for _ in range(self.N_LOADS)] - results = await asyncio.gather(*tasks) - - for result in results: - xrt.assert_identical(result, ds["foo"].variable.load()) - - self.assert_time_as_expected(timer.total_time) - - async def test_async_load_dataarray(self, memorystore): - latencystore = LatencyStore(memorystore, latency=self.LATENCY) - ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - - # TODO change the syntax to `.async.load()`? - async with AsyncTimer().measure() as timer: - tasks = [ds["foo"].async_load() for _ in range(self.N_LOADS)] - results = await asyncio.gather(*tasks) - - for result in results: - xrt.assert_identical(result, ds["foo"].load()) - - self.assert_time_as_expected(timer.total_time) - - async def test_async_load_dataset(self, memorystore): - latencystore = LatencyStore(memorystore, latency=self.LATENCY) - ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - + async def test_async_load(self, xr_obj): # TODO change the syntax to `.async.load()`? async with AsyncTimer().measure() as timer: - tasks = [ds.async_load() for _ in range(self.N_LOADS)] + tasks = [xr_obj.async_load() for _ in range(self.N_LOADS)] results = await asyncio.gather(*tasks) for result in results: - xrt.assert_identical(result, ds.load()) + xrt.assert_identical(result, xr_obj.load()) self.assert_time_as_expected(timer.total_time) From 9b41e78daafc42ca32d6961a95540ec8cee15457 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sun, 18 May 2025 01:24:13 +0300 Subject: [PATCH 13/45] async_load -> load_async --- xarray/core/dataarray.py | 4 ++-- xarray/core/dataset.py | 4 ++-- xarray/core/variable.py | 2 +- xarray/tests/test_async.py | 3 +-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 808f39d8c03..05f5d4c7fa8 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -1160,9 +1160,9 @@ def load(self, **kwargs) -> Self: self._coords = new._coords return self - async def async_load(self, **kwargs) -> Self: + async def load_async(self, **kwargs) -> Self: temp_ds = self._to_temp_dataset() - ds = await temp_ds.async_load(**kwargs) + ds = await temp_ds.load_async(**kwargs) new = self._from_temp_dataset(ds) self._variable = new._variable self._coords = new._coords diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 33e37e1ca4c..26441256a4a 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -552,7 +552,7 @@ def load(self, **kwargs) -> Self: return self - async def async_load(self, **kwargs) -> Self: + async def load_async(self, **kwargs) -> Self: # this blocks on chunked arrays but not on lazily indexed arrays # access .data to coerce everything to numpy or dask arrays @@ -573,7 +573,7 @@ async def async_load(self, **kwargs) -> Self: # load everything else sequentially for k, v in self.variables.items(): if k not in lazy_data: - await v.async_load() + await v.load_async() return self diff --git a/xarray/core/variable.py b/xarray/core/variable.py index cd3af386b95..e45987bca35 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -958,7 +958,7 @@ def load(self, **kwargs): self._data = to_duck_array(self._data, **kwargs) return self - async def async_load(self, **kwargs): + async def load_async(self, **kwargs): print("async inside Variable") self._data = await async_to_duck_array(self._data, **kwargs) return self diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 109deba78d3..d87208c3e59 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -146,9 +146,8 @@ def assert_time_as_expected(self, total_time: float) -> None: ) # Should take approximately LATENCY seconds, but allow some buffer async def test_async_load(self, xr_obj): - # TODO change the syntax to `.async.load()`? async with AsyncTimer().measure() as timer: - tasks = [xr_obj.async_load() for _ in range(self.N_LOADS)] + tasks = [xr_obj.load_async() for _ in range(self.N_LOADS)] results = await asyncio.gather(*tasks) for result in results: From 67ba26a1ee1704ab274dc80bcb10f8635a97caf1 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sun, 18 May 2025 03:18:24 +0300 Subject: [PATCH 14/45] make BackendArray an ABC --- xarray/backends/common.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index edda5cff429..ce0ff3c323e 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -4,6 +4,7 @@ import os import time import traceback +from abc import ABC, abstractmethod from collections.abc import Hashable, Iterable, Mapping, Sequence from glob import glob from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, Union, overload @@ -267,18 +268,22 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, initial_delay=500 time.sleep(1e-3 * next_delay) -class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): +class BackendArray(ABC, NdimSizeLenMixin, indexing.ExplicitlyIndexed): __slots__ = () + @abstractmethod + def __getitem__(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: ... + + async def async_getitem(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: + raise NotImplementedError("Backend does not not support asynchronous loading") + def get_duck_array(self, dtype: np.typing.DTypeLike = None): key = indexing.BasicIndexer((slice(None),) * self.ndim) return self[key] # type: ignore[index] async def async_get_duck_array(self, dtype: np.typing.DTypeLike = None): key = indexing.BasicIndexer((slice(None),) * self.ndim) - # TODO use zarr-python async get method here? - print("async inside BackendArray") - return await self.getitem(key) # type: ignore[index] + return await self.async_getitem(key) # type: ignore[index] class AbstractDataStore: From 9344e2e78ecb5ca3cadf436fe8a446325b73b13f Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sun, 18 May 2025 03:19:10 +0300 Subject: [PATCH 15/45] explain how to add async support for any backend in the docs --- doc/internals/how-to-add-new-backend.rst | 49 ++++++++++++++++-------- xarray/backends/zarr.py | 1 - 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/doc/internals/how-to-add-new-backend.rst b/doc/internals/how-to-add-new-backend.rst index e4f6d54f75c..a6858b35775 100644 --- a/doc/internals/how-to-add-new-backend.rst +++ b/doc/internals/how-to-add-new-backend.rst @@ -325,39 +325,42 @@ information on plugins. How to support lazy loading +++++++++++++++++++++++++++ -If you want to make your backend effective with big datasets, then you should -support lazy loading. -Basically, you shall replace the :py:class:`numpy.ndarray` inside the -variables with a custom class that supports lazy loading indexing. +If you want to make your backend effective with big datasets, then you should take advantage of xarray's +support for lazy loading and indexing. + +Basically, when your backend constructs the ``Variable`` objects, +you need to replace the :py:class:`numpy.ndarray` inside the +variables with a custom :py:class:`~xarray.backends.BackendArray` subclass that supports lazy loading and indexing. See the example below: .. code-block:: python - backend_array = MyBackendArray() data = indexing.LazilyIndexedArray(backend_array) var = xr.Variable(dims, data, attrs=attrs, encoding=encoding) Where: -- :py:class:`~xarray.core.indexing.LazilyIndexedArray` is a class - provided by Xarray that manages the lazy loading. -- ``MyBackendArray`` shall be implemented by the backend and shall inherit +- :py:class:`~xarray.core.indexing.LazilyIndexedArray` is a wrapper class + provided by Xarray that manages the lazy loading and indexing. +- ``MyBackendArray`` should be implemented by the backend and must inherit from :py:class:`~xarray.backends.BackendArray`. BackendArray subclassing ^^^^^^^^^^^^^^^^^^^^^^^^ -The BackendArray subclass shall implement the following method and attributes: +The BackendArray subclass must implement the following method and attributes: -- the ``__getitem__`` method that takes in input an index and returns a - `NumPy `__ array -- the ``shape`` attribute +- the ``__getitem__`` method that takes an index as an input and returns a + `NumPy `__ array, +- the ``shape`` attribute, - the ``dtype`` attribute. -Xarray supports different type of :doc:`/user-guide/indexing`, that can be -grouped in three types of indexes +It may also optionally implement an additional ``async_getitem`` method. + +Xarray supports different types of :doc:`/user-guide/indexing`, that can be +grouped in three types of indexes: :py:class:`~xarray.core.indexing.BasicIndexer`, -:py:class:`~xarray.core.indexing.OuterIndexer` and +:py:class:`~xarray.core.indexing.OuterIndexer`, and :py:class:`~xarray.core.indexing.VectorizedIndexer`. This implies that the implementation of the method ``__getitem__`` can be tricky. In order to simplify this task, Xarray provides a helper function, @@ -413,8 +416,22 @@ input the ``key``, the array ``shape`` and the following parameters: For more details see :py:class:`~xarray.core.indexing.IndexingSupport` and :ref:`RST indexing`. +Async support +^^^^^^^^^^^^^ + +Backends can also optionally support loading data asynchronously via xarray's asynchronous loading methods +(e.g. ``~xarray.Dataset.load_async``). +To support async loading the `BackendArray` subclass must additionally implement the ``BackendArray.async_getitem`` method. + +Note that implementing this method is only necessary if you want to be able to load data from different xarray objects concurrently. +Even without this method your ``BackendArray`` implementation is still free to concurrently load chunks of data for a single ``Variable`` itself, +so long as it does so behind the synchronous ``__getitem__`` interface. + +Dask support +^^^^^^^^^^^^ + In order to support `Dask Distributed `__ and -:py:mod:`multiprocessing`, ``BackendArray`` subclass should be serializable +:py:mod:`multiprocessing`, the ``BackendArray`` subclass should be serializable either with :ref:`io.pickle` or `cloudpickle `__. That implies that all the reference to open files should be dropped. For diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index f068826eef2..9e36e8198c7 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -234,7 +234,6 @@ def __getitem__(self, key): # could possibly have a work-around for 0d data here async def async_getitem(self, key): - # this doesn't need to be async array = self._array if isinstance(key, indexing.BasicIndexer): method = self._async_getitem From f8f8563586b58e1825d70e6f115966216065c46b Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 19 May 2025 09:46:23 +0700 Subject: [PATCH 16/45] add new methods to api docs --- doc/api-hidden.rst | 1 + doc/api.rst | 2 ++ 2 files changed, 3 insertions(+) diff --git a/doc/api-hidden.rst b/doc/api-hidden.rst index 9a6037cf3c4..98d3704de9b 100644 --- a/doc/api-hidden.rst +++ b/doc/api-hidden.rst @@ -228,6 +228,7 @@ Variable.isnull Variable.item Variable.load + Variable.load_async Variable.max Variable.mean Variable.median diff --git a/doc/api.rst b/doc/api.rst index b6023866eb8..80715555e56 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -1122,6 +1122,7 @@ Dataset methods Dataset.filter_by_attrs Dataset.info Dataset.load + Dataset.load_async Dataset.persist Dataset.unify_chunks @@ -1154,6 +1155,7 @@ DataArray methods DataArray.compute DataArray.persist DataArray.load + DataArray.load_async DataArray.unify_chunks DataTree methods From 30ce9bea5f21c722f15f4cf45ea8ca4e617cdf71 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 19 May 2025 09:54:15 +0700 Subject: [PATCH 17/45] whatsnew --- doc/whats-new.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index c8fbecf82af..97dc3096fde 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -25,6 +25,8 @@ v2025.05.0 (unreleased) New Features ~~~~~~~~~~~~ +- Added new asynchronous loading methods :py:meth:`~xarray.Dataset.load_async`, :py:meth:`~xarray.DataArray.load_async`, :py:meth:`~xarray.Variable.load_async`. + (:issue:`10326`, :pull:`10327`) By `Tom Nicholas `_. Breaking changes ~~~~~~~~~~~~~~~~ @@ -38,7 +40,6 @@ Bug fixes ~~~~~~~~~ - Fix :py:class:`~xarray.groupers.BinGrouper` when ``labels`` is not specified (:issue:`10284`). By `Deepak Cherian `_. - - Allow accessing arbitrary attributes on Pandas ExtensionArrays. By `Deepak Cherian `_. From 2342b50b6459197542dab6bc87697060f2314d90 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 19 May 2025 02:55:54 +0000 Subject: [PATCH 18/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/internals/how-to-add-new-backend.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/doc/internals/how-to-add-new-backend.rst b/doc/internals/how-to-add-new-backend.rst index a6858b35775..f5ee255c420 100644 --- a/doc/internals/how-to-add-new-backend.rst +++ b/doc/internals/how-to-add-new-backend.rst @@ -325,7 +325,7 @@ information on plugins. How to support lazy loading +++++++++++++++++++++++++++ -If you want to make your backend effective with big datasets, then you should take advantage of xarray's +If you want to make your backend effective with big datasets, then you should take advantage of xarray's support for lazy loading and indexing. Basically, when your backend constructs the ``Variable`` objects, @@ -334,6 +334,7 @@ variables with a custom :py:class:`~xarray.backends.BackendArray` subclass that See the example below: .. code-block:: python + backend_array = MyBackendArray() data = indexing.LazilyIndexedArray(backend_array) var = xr.Variable(dims, data, attrs=attrs, encoding=encoding) @@ -424,7 +425,7 @@ Backends can also optionally support loading data asynchronously via xarray's as To support async loading the `BackendArray` subclass must additionally implement the ``BackendArray.async_getitem`` method. Note that implementing this method is only necessary if you want to be able to load data from different xarray objects concurrently. -Even without this method your ``BackendArray`` implementation is still free to concurrently load chunks of data for a single ``Variable`` itself, +Even without this method your ``BackendArray`` implementation is still free to concurrently load chunks of data for a single ``Variable`` itself, so long as it does so behind the synchronous ``__getitem__`` interface. Dask support From b6d4a824c5414b76a77b34ffdc18dcd858364f6e Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Sun, 18 May 2025 19:56:51 -0700 Subject: [PATCH 19/45] Fix ci/minimum_versions.py --- ci/minimum_versions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/minimum_versions.py b/ci/minimum_versions.py index 4cc0b76916b..21123bffcd6 100644 --- a/ci/minimum_versions.py +++ b/ci/minimum_versions.py @@ -30,7 +30,8 @@ "coveralls", "pip", "pytest", - "pytest-asynciopytest-cov", + "pytest-asyncio", + "pytest-cov", "pytest-env", "pytest-mypy-plugins", "pytest-timeout", From 2079d7e5f703fd21bd4aad615b985f3b2ddb2729 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 21 May 2025 08:43:24 +0700 Subject: [PATCH 20/45] fix formatting --- doc/internals/how-to-add-new-backend.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/internals/how-to-add-new-backend.rst b/doc/internals/how-to-add-new-backend.rst index f5ee255c420..883c817dccc 100644 --- a/doc/internals/how-to-add-new-backend.rst +++ b/doc/internals/how-to-add-new-backend.rst @@ -422,7 +422,7 @@ Async support Backends can also optionally support loading data asynchronously via xarray's asynchronous loading methods (e.g. ``~xarray.Dataset.load_async``). -To support async loading the `BackendArray` subclass must additionally implement the ``BackendArray.async_getitem`` method. +To support async loading the ``BackendArray`` subclass must additionally implement the ``BackendArray.async_getitem`` method. Note that implementing this method is only necessary if you want to be able to load data from different xarray objects concurrently. Even without this method your ``BackendArray`` implementation is still free to concurrently load chunks of data for a single ``Variable`` itself, From 48e453434593281ae5be6f6d5962bf9e7d7cd6f0 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 21 May 2025 08:45:34 +0700 Subject: [PATCH 21/45] concurrently load different variables in ds.load_async using asyncio.gather --- xarray/core/dataset.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 26441256a4a..5d5abd27987 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import copy import datetime import math @@ -531,49 +532,50 @@ def load(self, **kwargs) -> Self: dask.compute """ # access .data to coerce everything to numpy or dask arrays - lazy_data = { + chunked_data = { k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) } - if lazy_data: - chunkmanager = get_chunked_array_type(*lazy_data.values()) + if chunked_data: + chunkmanager = get_chunked_array_type(*chunked_data.values()) # evaluate all the chunked arrays simultaneously evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( - *lazy_data.values(), **kwargs + *chunked_data.values(), **kwargs ) - for k, data in zip(lazy_data, evaluated_data, strict=False): + for k, data in zip(chunked_data, evaluated_data, strict=False): self.variables[k].data = data # load everything else sequentially - for k, v in self.variables.items(): - if k not in lazy_data: - v.load() + [v.load_async() for k, v in self.variables.items() if k not in chunked_data] return self async def load_async(self, **kwargs) -> Self: + # TODO refactor this to pul out the common chunked_data codepath + # this blocks on chunked arrays but not on lazily indexed arrays # access .data to coerce everything to numpy or dask arrays - lazy_data = { + chunked_data = { k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) } - if lazy_data: - chunkmanager = get_chunked_array_type(*lazy_data.values()) + if chunked_data: + chunkmanager = get_chunked_array_type(*chunked_data.values()) # evaluate all the chunked arrays simultaneously evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( - *lazy_data.values(), **kwargs + *chunked_data.values(), **kwargs ) - for k, data in zip(lazy_data, evaluated_data, strict=False): + for k, data in zip(chunked_data, evaluated_data, strict=False): self.variables[k].data = data - # load everything else sequentially - for k, v in self.variables.items(): - if k not in lazy_data: - await v.load_async() + # load everything else concurrently + tasks = [ + v.load_async() for k, v in self.variables.items() if k not in chunked_data + ] + await asyncio.gather(*tasks) return self From cca758931dd46d2b59756843086db19b97f9449e Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 21 May 2025 08:46:07 +0700 Subject: [PATCH 22/45] test concurrent loading of multiple variables in one dataset --- xarray/tests/test_async.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index d87208c3e59..d8e91c4aca3 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -120,7 +120,6 @@ async def measure(self): @requires_zarr_v3 @pytest.mark.asyncio class TestAsyncLoad: - N_LOADS: int = 5 LATENCY: float = 1.0 @pytest.fixture(params=["ds", "da", "var"]) @@ -136,21 +135,42 @@ def xr_obj(self, request, memorystore) -> xr.Dataset | xr.DataArray | xr.Variabl case "ds": return ds - def assert_time_as_expected(self, total_time: float) -> None: - assert total_time > self.LATENCY # Cannot possibly be quicker than this + def assert_time_as_expected( + self, total_time: float, latency: float, n_loads: int + ) -> None: + assert total_time > latency # Cannot possibly be quicker than this assert ( - total_time < self.LATENCY * self.N_LOADS + total_time < latency * n_loads ) # If this isn't true we're gaining nothing from async assert ( - abs(total_time - self.LATENCY) < 2.0 - ) # Should take approximately LATENCY seconds, but allow some buffer + abs(total_time - latency) < 2.0 + ) # Should take approximately `latency` seconds, but allow some buffer + + async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: + N_OBJECTS = 5 - async def test_async_load(self, xr_obj): async with AsyncTimer().measure() as timer: - tasks = [xr_obj.load_async() for _ in range(self.N_LOADS)] + tasks = [xr_obj.load_async() for _ in range(N_OBJECTS)] results = await asyncio.gather(*tasks) for result in results: xrt.assert_identical(result, xr_obj.load()) - self.assert_time_as_expected(timer.total_time) + self.assert_time_as_expected( + total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS + ) + + async def test_concurrent_load_multiple_variables(self, memorystore) -> None: + latencystore = LatencyStore(memorystore, latency=self.LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + # TODO up the number of variables in the dataset? + async with AsyncTimer().measure() as timer: + result_ds = await ds.load_async() + + xrt.assert_identical(result_ds, ds.load()) + + # 2 because there are 2 lazy variables in the dataset + self.assert_time_as_expected( + total_time=timer.total_time, latency=self.LATENCY, n_loads=2 + ) From dfe9b87d7267c66fd77975f845e298dc2131ffdb Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 21 May 2025 11:04:34 +0700 Subject: [PATCH 23/45] fix non-awaited load_async --- xarray/core/dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 5d5abd27987..8a4e7177caa 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -547,12 +547,12 @@ def load(self, **kwargs) -> Self: self.variables[k].data = data # load everything else sequentially - [v.load_async() for k, v in self.variables.items() if k not in chunked_data] + [v.load() for k, v in self.variables.items() if k not in chunked_data] return self async def load_async(self, **kwargs) -> Self: - # TODO refactor this to pul out the common chunked_data codepath + # TODO refactor this to pull out the common chunked_data codepath # this blocks on chunked arrays but not on lazily indexed arrays @@ -572,10 +572,10 @@ async def load_async(self, **kwargs) -> Self: self.variables[k].data = data # load everything else concurrently - tasks = [ + coros = [ v.load_async() for k, v in self.variables.items() if k not in chunked_data ] - await asyncio.gather(*tasks) + await asyncio.gather(*coros) return self From 84099f3f164c2e097c71a6050027eb3990e6cd7b Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 21 May 2025 11:05:37 +0700 Subject: [PATCH 24/45] rearrange test order --- xarray/tests/test_async.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index d8e91c4aca3..d17ccf9e7e5 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -11,6 +11,7 @@ import xarray.testing as xrt from xarray.tests import has_zarr_v3, requires_zarr_v3 + if has_zarr_v3: import zarr from zarr.abc.store import ByteRequest, Store @@ -146,20 +147,6 @@ def assert_time_as_expected( abs(total_time - latency) < 2.0 ) # Should take approximately `latency` seconds, but allow some buffer - async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: - N_OBJECTS = 5 - - async with AsyncTimer().measure() as timer: - tasks = [xr_obj.load_async() for _ in range(N_OBJECTS)] - results = await asyncio.gather(*tasks) - - for result in results: - xrt.assert_identical(result, xr_obj.load()) - - self.assert_time_as_expected( - total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS - ) - async def test_concurrent_load_multiple_variables(self, memorystore) -> None: latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) @@ -174,3 +161,17 @@ async def test_concurrent_load_multiple_variables(self, memorystore) -> None: self.assert_time_as_expected( total_time=timer.total_time, latency=self.LATENCY, n_loads=2 ) + + async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: + N_OBJECTS = 5 + + async with AsyncTimer().measure() as timer: + coros = [xr_obj.load_async() for _ in range(N_OBJECTS)] + results = await asyncio.gather(*coros) + + for result in results: + xrt.assert_identical(result, xr_obj.load()) + + self.assert_time_as_expected( + total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS + ) From ab000c86c463dbbad906b433e15a8dd29abc6073 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 21 May 2025 04:06:00 +0000 Subject: [PATCH 25/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/tests/test_async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index d17ccf9e7e5..2d5a7f027d6 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -11,7 +11,6 @@ import xarray.testing as xrt from xarray.tests import has_zarr_v3, requires_zarr_v3 - if has_zarr_v3: import zarr from zarr.abc.store import ByteRequest, Store From a8b7b466abbc511a6fdf045b459c6f0d66107bd1 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 12:36:15 +0700 Subject: [PATCH 26/45] add test for orthogonal indexing --- xarray/tests/test_async.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index d17ccf9e7e5..45240a66bd4 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -11,7 +11,6 @@ import xarray.testing as xrt from xarray.tests import has_zarr_v3, requires_zarr_v3 - if has_zarr_v3: import zarr from zarr.abc.store import ByteRequest, Store @@ -77,7 +76,7 @@ async def get_partial_values( @pytest.fixture def memorystore() -> "MemoryStore": memorystore = zarr.storage.MemoryStore({}) - z = zarr.create_array( + z1 = zarr.create_array( store=memorystore, name="foo", shape=(10, 10), @@ -85,17 +84,17 @@ def memorystore() -> "MemoryStore": dtype="f4", dimension_names=["x", "y"], ) - z[:, :] = np.random.random((10, 10)) + z1[:, :] = np.random.random((10, 10)) - z = zarr.create_array( + z2 = zarr.create_array( store=memorystore, - name="bar", + name="x", shape=(10,), chunks=(5), dtype="f4", dimension_names=["x"], ) - z[:] = np.random.random((10,)) + z2[:] = np.arange(10) return memorystore @@ -123,7 +122,7 @@ async def measure(self): class TestAsyncLoad: LATENCY: float = 1.0 - @pytest.fixture(params=["ds", "da", "var"]) + @pytest.fixture(params=["var", "ds", "da"]) def xr_obj(self, request, memorystore) -> xr.Dataset | xr.DataArray | xr.Variable: latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) @@ -175,3 +174,17 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: self.assert_time_as_expected( total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS ) + + @pytest.mark.xfail(reason="not implemented") + async def test_indexing(self, memorystore) -> None: + latencystore = LatencyStore(memorystore, latency=self.LATENCY) + ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + + # TODO test basic indexing + + # test orthogonal indexing + indexer = {"x": [2, 3]} + result = await ds.sel(indexer).load_async() + xrt.assert_identical(result, ds.sel(indexer).load()) + + # TODO test vectorized indexing From 82c7654bbc28a66965a195c6fc36131d948b3c2d Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 12:36:32 +0700 Subject: [PATCH 27/45] explicitly forbid orthogonal indexing --- xarray/backends/zarr.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 9e36e8198c7..981c41d828a 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -238,15 +238,15 @@ async def async_getitem(self, key): if isinstance(key, indexing.BasicIndexer): method = self._async_getitem elif isinstance(key, indexing.VectorizedIndexer): - # TODO - method = self._vindex + # method = self._vindex + raise NotImplementedError("async lazy vectorized indexing is not supported") elif isinstance(key, indexing.OuterIndexer): - # TODO - method = self._oindex + # method = self._oindex + raise NotImplementedError("async lazy orthogonal indexing is not supported") print("did an async get") return await indexing.async_explicit_indexing_adapter( - key, array.shape, indexing.IndexingSupport.VECTORIZED, method + key, array.shape, indexing.IndexingSupport.BASIC, method ) From 5eacdb0eead3e09cbf888a0be656d90845acb30a Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 13:36:31 +0700 Subject: [PATCH 28/45] support async orthogonal indexing via https://github.com/zarr-developers/zarr-python/pull/3083 --- xarray/backends/zarr.py | 9 ++++++--- xarray/tests/test_async.py | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 981c41d828a..e3f825f2b64 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -218,6 +218,10 @@ async def _async_getitem(self, key): async_array = self._array._async_array return await async_array.getitem(key) + async def _async_oindex(self, key): + async_array = self._array._async_array + return await async_array.oindex.getitem(key) + def __getitem__(self, key): array = self._array if isinstance(key, indexing.BasicIndexer): @@ -241,12 +245,11 @@ async def async_getitem(self, key): # method = self._vindex raise NotImplementedError("async lazy vectorized indexing is not supported") elif isinstance(key, indexing.OuterIndexer): - # method = self._oindex - raise NotImplementedError("async lazy orthogonal indexing is not supported") + method = self._async_oindex print("did an async get") return await indexing.async_explicit_indexing_adapter( - key, array.shape, indexing.IndexingSupport.BASIC, method + key, array.shape, indexing.IndexingSupport.OUTER, method ) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 45240a66bd4..9ea15468fce 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -25,6 +25,9 @@ class LatencyStore(WrapperStore[T_Store]): latency: float + # TODO only have to add this because of dumb behaviour in zarr where it raises with "ValueError: Store is not read-only but mode is 'r'" + read_only = True + def __init__( self, store: T_Store, @@ -175,7 +178,6 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS ) - @pytest.mark.xfail(reason="not implemented") async def test_indexing(self, memorystore) -> None: latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) From 093bf50275700e67560787181adc55ec61ffb49d Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 14:12:27 +0700 Subject: [PATCH 29/45] add test for vectorized indexing (even if it doesn't work) --- xarray/tests/test_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 9ea15468fce..ddced7423ab 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -189,4 +189,8 @@ async def test_indexing(self, memorystore) -> None: result = await ds.sel(indexer).load_async() xrt.assert_identical(result, ds.sel(indexer).load()) - # TODO test vectorized indexing + # test vectorized indexing + # TODO this shouldn't pass! I haven't implemented async vectorized indexing yet... + indexer = xr.DataArray([2, 3], dims=['x']) + result = await ds.foo[indexer].load_async() + xrt.assert_identical(result, ds.foo[indexer].load()) From 4073a24f563c1010d6fb50b6757b38196909d02c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 23 May 2025 07:12:53 +0000 Subject: [PATCH 30/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/tests/test_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index ddced7423ab..81493bff30c 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -191,6 +191,6 @@ async def test_indexing(self, memorystore) -> None: # test vectorized indexing # TODO this shouldn't pass! I haven't implemented async vectorized indexing yet... - indexer = xr.DataArray([2, 3], dims=['x']) + indexer = xr.DataArray([2, 3], dims=["x"]) result = await ds.foo[indexer].load_async() xrt.assert_identical(result, ds.foo[indexer].load()) From 842a06cd99b9d5d1112fb6a5a2de58912fe8a96d Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 20:32:15 +0700 Subject: [PATCH 31/45] add test for basic indexing --- xarray/tests/test_async.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 81493bff30c..10c78089914 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -182,7 +182,10 @@ async def test_indexing(self, memorystore) -> None: latencystore = LatencyStore(memorystore, latency=self.LATENCY) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - # TODO test basic indexing + # test basic indexing + indexer = {"x": 2} + result = await ds.sel(indexer).load_async() + xrt.assert_identical(result, ds.sel(indexer).load()) # test orthogonal indexing indexer = {"x": [2, 3]} From e19ab55e82645c7ace96e8f1472207b3f5a8fae9 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 22:27:06 +0700 Subject: [PATCH 32/45] correct test to actually use vectorized indexing --- xarray/tests/test_async.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 10c78089914..3bc27d82c44 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -179,7 +179,8 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: ) async def test_indexing(self, memorystore) -> None: - latencystore = LatencyStore(memorystore, latency=self.LATENCY) + # TODO we don't need a LatencyStore for this test + latencystore = LatencyStore(memorystore, latency=0.0) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) # test basic indexing @@ -193,7 +194,6 @@ async def test_indexing(self, memorystore) -> None: xrt.assert_identical(result, ds.sel(indexer).load()) # test vectorized indexing - # TODO this shouldn't pass! I haven't implemented async vectorized indexing yet... - indexer = xr.DataArray([2, 3], dims=["x"]) - result = await ds.foo[indexer].load_async() - xrt.assert_identical(result, ds.foo[indexer].load()) + indexer = {"x": xr.DataArray([2, 3], dims="points"), "y": xr.DataArray([2, 3], dims="points")} + result = await ds.isel(indexer).load_async() + xrt.assert_identical(result, ds.isel(indexer).load()) From b9e8e0631f19ed42a3894efab95c4f93012fcb93 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 23 May 2025 23:02:35 +0700 Subject: [PATCH 33/45] refactor to parametrize indexing test --- xarray/tests/test_async.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 3bc27d82c44..7a8b0190298 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -178,22 +178,26 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS ) - async def test_indexing(self, memorystore) -> None: + @pytest.mark.parametrize( + "method,indexer", + [ + ("sel", {"x": 2}), + ("sel", {"x": [2, 3]}), + ( + "isel", + { + "x": xr.DataArray([2, 3], dims="points"), + "y": xr.DataArray([2, 3], dims="points"), + }, + ), + ], + ids=["basic", "outer", "vectorized"], + ) + async def test_indexing(self, memorystore, method, indexer) -> None: # TODO we don't need a LatencyStore for this test latencystore = LatencyStore(memorystore, latency=0.0) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - # test basic indexing - indexer = {"x": 2} - result = await ds.sel(indexer).load_async() - xrt.assert_identical(result, ds.sel(indexer).load()) - - # test orthogonal indexing - indexer = {"x": [2, 3]} - result = await ds.sel(indexer).load_async() - xrt.assert_identical(result, ds.sel(indexer).load()) - - # test vectorized indexing - indexer = {"x": xr.DataArray([2, 3], dims="points"), "y": xr.DataArray([2, 3], dims="points")} - result = await ds.isel(indexer).load_async() - xrt.assert_identical(result, ds.isel(indexer).load()) + result = await getattr(ds, method)(**indexer).load_async() + expected = getattr(ds, method)(**indexer).load() + xrt.assert_identical(result, expected) From 8bc7bea9b1ee8ee6a24870bd1d36f516d7c8da30 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Thu, 29 May 2025 17:18:00 +0700 Subject: [PATCH 34/45] implement async vectorized indexing --- xarray/backends/zarr.py | 11 ++++++----- xarray/core/indexing.py | 37 ++++++++++++++++++++++++++++++++++++- xarray/core/variable.py | 1 - xarray/tests/test_async.py | 3 ++- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index e3f825f2b64..9c3d0dc7d63 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -222,6 +222,10 @@ async def _async_oindex(self, key): async_array = self._array._async_array return await async_array.oindex.getitem(key) + async def _async_vindex(self, key): + async_array = self._array._async_array + return await async_array.vindex.getitem(key) + def __getitem__(self, key): array = self._array if isinstance(key, indexing.BasicIndexer): @@ -242,14 +246,11 @@ async def async_getitem(self, key): if isinstance(key, indexing.BasicIndexer): method = self._async_getitem elif isinstance(key, indexing.VectorizedIndexer): - # method = self._vindex - raise NotImplementedError("async lazy vectorized indexing is not supported") + method = self._async_vindex elif isinstance(key, indexing.OuterIndexer): method = self._async_oindex - - print("did an async get") return await indexing.async_explicit_indexing_adapter( - key, array.shape, indexing.IndexingSupport.OUTER, method + key, array.shape, indexing.IndexingSupport.VECTORIZED, method ) diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 53f9bd12088..22220184cb8 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -525,7 +525,7 @@ def get_duck_array(self): async def async_get_duck_array(self): key = BasicIndexer((slice(None),) * self.ndim) - return self[key] + return await self.async_getitem(key) def _oindex_get(self, indexer: OuterIndexer): raise NotImplementedError( @@ -756,6 +756,22 @@ def get_duck_array(self): array = array.get_duck_array() return _wrap_numpy_scalars(array) + async def async_get_duck_array(self): + print("inside LazilyVectorizedIndexedArray.async_get_duck_array") + if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): + array = apply_indexer(self.array, self.key) + else: + # If the array is not an ExplicitlyIndexedNDArrayMixin, + # it may wrap a BackendArray so use its __getitem__ + array = await self.array.async_getitem(self.key) + # self.array[self.key] is now a numpy array when + # self.array is a BackendArray subclass + # and self.key is BasicIndexer((slice(None, None, None),)) + # so we need the explicit check for ExplicitlyIndexed + if isinstance(array, ExplicitlyIndexed): + array = await array.async_get_duck_array() + return _wrap_numpy_scalars(array) + def _updated_key(self, new_key: ExplicitIndexer): return _combine_indexers(self.key, self.shape, new_key) @@ -1608,6 +1624,16 @@ def __getitem__(self, indexer: ExplicitIndexer): key = indexer.tuple + (Ellipsis,) return array[key] + async def async_getitem(self, indexer: ExplicitIndexer): + self._check_and_raise_if_non_basic_indexer(indexer) + + array = self.array + # We want 0d slices rather than scalars. This is achieved by + # appending an ellipsis (see + # https://numpy.org/doc/stable/reference/arrays.indexing.html#detailed-notes). + key = indexer.tuple + (Ellipsis,) + return array[key] + def _safe_setitem(self, array, key: tuple[Any, ...], value: Any) -> None: try: array[key] = value @@ -1855,6 +1881,15 @@ def get_duck_array(self) -> np.ndarray | PandasExtensionArray: return PandasExtensionArray(self.array.array) return np.asarray(self) + async def async_get_duck_array(self) -> np.ndarray | PandasExtensionArray: + # TODO this must surely be wrong - it's not async yet + print("in PandasIndexingAdapter") + if pd.api.types.is_extension_array_dtype(self.array): + from xarray.core.extension_array import PandasExtensionArray + + return PandasExtensionArray(self.array.array) + return np.asarray(self) + @property def shape(self) -> _Shape: return (len(self.array),) diff --git a/xarray/core/variable.py b/xarray/core/variable.py index e45987bca35..38f2676ec52 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -959,7 +959,6 @@ def load(self, **kwargs): return self async def load_async(self, **kwargs): - print("async inside Variable") self._data = await async_to_duck_array(self._data, **kwargs) return self diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 7a8b0190298..7ec1967cf86 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -184,7 +184,7 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: ("sel", {"x": 2}), ("sel", {"x": [2, 3]}), ( - "isel", + "sel", { "x": xr.DataArray([2, 3], dims="points"), "y": xr.DataArray([2, 3], dims="points"), @@ -198,6 +198,7 @@ async def test_indexing(self, memorystore, method, indexer) -> None: latencystore = LatencyStore(memorystore, latency=0.0) ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) + # TODO we're not actually testing that these indexing methods are not blocking... result = await getattr(ds, method)(**indexer).load_async() expected = getattr(ds, method)(**indexer).load() xrt.assert_identical(result, expected) From 6c47e3f41bb4934959df6eb14acbb5dc0caf15d1 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Thu, 29 May 2025 18:00:00 +0700 Subject: [PATCH 35/45] revert breaking change to BackendArray --- xarray/backends/common.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index ce0ff3c323e..b6cb1660c44 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -268,12 +268,9 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, initial_delay=500 time.sleep(1e-3 * next_delay) -class BackendArray(ABC, NdimSizeLenMixin, indexing.ExplicitlyIndexed): +class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): __slots__ = () - @abstractmethod - def __getitem__(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: ... - async def async_getitem(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: raise NotImplementedError("Backend does not not support asynchronous loading") From a86f6465ea6e4f39304c6b9f4db31b4ee05ed30f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 May 2025 11:00:28 +0000 Subject: [PATCH 36/45] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/backends/common.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index b6cb1660c44..10a698ac329 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -4,7 +4,6 @@ import os import time import traceback -from abc import ABC, abstractmethod from collections.abc import Hashable, Iterable, Mapping, Sequence from glob import glob from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, Union, overload From 884ce139acfe3c6b1f38ce1eb8ac03370fd4e9dd Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Thu, 29 May 2025 18:13:44 +0700 Subject: [PATCH 37/45] remove indirection in _ensure_cached method --- xarray/core/indexing.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 22220184cb8..2497b0e71bc 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -876,19 +876,15 @@ class MemoryCachedArray(ExplicitlyIndexedNDArrayMixin): def __init__(self, array): self.array = _wrap_numpy_scalars(as_indexable(array)) - def _ensure_cached(self): - self.array = as_indexable(self.array.get_duck_array()) - - async def _async_ensure_cached(self): - duck_array = await self.array.async_get_duck_array() - self.array = as_indexable(duck_array) - def get_duck_array(self): - self._ensure_cached() + # first ensure the array object is cached + self.array = as_indexable(self.array.get_duck_array()) return self.array.get_duck_array() async def async_get_duck_array(self): - await self._async_ensure_cached() + # first ensure the array object is cached + duck_array = await self.array.async_get_duck_array() + self.array = as_indexable(duck_array) return await self.array.async_get_duck_array() def _oindex_get(self, indexer: OuterIndexer): From a43af86a13a10b13e408146f562dce473105e4c8 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 29 May 2025 15:33:27 -0600 Subject: [PATCH 38/45] IndexingAdapters don't need async get --- xarray/core/indexing.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 2497b0e71bc..1198f835789 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -678,7 +678,9 @@ async def async_get_duck_array(self): # and self.key is BasicIndexer((slice(None, None, None),)) # so we need the explicit check for ExplicitlyIndexed if isinstance(array, ExplicitlyIndexed): - array = await array.async_get_duck_array() + # At this point, we have issued completed the possible async load from disk + # and array is in-memory. So use the sync get + array = array.get_duck_array() return _wrap_numpy_scalars(array) def transpose(self, order): @@ -769,7 +771,9 @@ async def async_get_duck_array(self): # and self.key is BasicIndexer((slice(None, None, None),)) # so we need the explicit check for ExplicitlyIndexed if isinstance(array, ExplicitlyIndexed): - array = await array.async_get_duck_array() + # At this point, we have issued completed the possible async load from disk + # and array is in-memory. So use the sync get + array = array.get_duck_array() return _wrap_numpy_scalars(array) def _updated_key(self, new_key: ExplicitIndexer): @@ -877,15 +881,16 @@ def __init__(self, array): self.array = _wrap_numpy_scalars(as_indexable(array)) def get_duck_array(self): - # first ensure the array object is cached - self.array = as_indexable(self.array.get_duck_array()) - return self.array.get_duck_array() + duck_array = self.array.get_duck_array() + # ensure the array object is cached in-memory + self.array = as_indexable(duck_array) + return duck_array async def async_get_duck_array(self): - # first ensure the array object is cached duck_array = await self.array.async_get_duck_array() + # ensure the array object is cached in-memory self.array = as_indexable(duck_array) - return await self.array.async_get_duck_array() + return duck_array def _oindex_get(self, indexer: OuterIndexer): return type(self)(_wrap_numpy_scalars(self.array.oindex[indexer])) @@ -1620,16 +1625,6 @@ def __getitem__(self, indexer: ExplicitIndexer): key = indexer.tuple + (Ellipsis,) return array[key] - async def async_getitem(self, indexer: ExplicitIndexer): - self._check_and_raise_if_non_basic_indexer(indexer) - - array = self.array - # We want 0d slices rather than scalars. This is achieved by - # appending an ellipsis (see - # https://numpy.org/doc/stable/reference/arrays.indexing.html#detailed-notes). - key = indexer.tuple + (Ellipsis,) - return array[key] - def _safe_setitem(self, array, key: tuple[Any, ...], value: Any) -> None: try: array[key] = value From 17d7a0efe98dd5ee797420747612f578657626d2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 29 May 2025 16:00:35 -0600 Subject: [PATCH 39/45] Add tests --- xarray/tests/test_async.py | 3 ++- xarray/tests/test_indexing.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 7ec1967cf86..99f4619e736 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -182,6 +182,7 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: "method,indexer", [ ("sel", {"x": 2}), + ("sel", {"x": slice(2, 4)}), ("sel", {"x": [2, 3]}), ( "sel", @@ -191,7 +192,7 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: }, ), ], - ids=["basic", "outer", "vectorized"], + ids=["basic-int", "basic-slice", "outer", "vectorized"], ) async def test_indexing(self, memorystore, method, indexer) -> None: # TODO we don't need a LatencyStore for this test diff --git a/xarray/tests/test_indexing.py b/xarray/tests/test_indexing.py index 6dd75b58c6a..d308844c6fa 100644 --- a/xarray/tests/test_indexing.py +++ b/xarray/tests/test_indexing.py @@ -490,6 +490,23 @@ def test_sub_array(self) -> None: assert isinstance(child.array, indexing.NumpyIndexingAdapter) assert isinstance(wrapped.array, indexing.LazilyIndexedArray) + async def test_async_wrapper(self) -> None: + original = indexing.LazilyIndexedArray(np.arange(10)) + wrapped = indexing.MemoryCachedArray(original) + await wrapped.async_get_duck_array() + assert_array_equal(wrapped, np.arange(10)) + assert isinstance(wrapped.array, indexing.NumpyIndexingAdapter) + + async def test_async_sub_array(self) -> None: + original = indexing.LazilyIndexedArray(np.arange(10)) + wrapped = indexing.MemoryCachedArray(original) + child = wrapped[B[:5]] + assert isinstance(child, indexing.MemoryCachedArray) + await child.async_get_duck_array() + assert_array_equal(child, np.arange(5)) + assert isinstance(child.array, indexing.NumpyIndexingAdapter) + assert isinstance(wrapped.array, indexing.LazilyIndexedArray) + def test_setitem(self) -> None: original = np.arange(10) wrapped = indexing.MemoryCachedArray(original) From d824a2d3898f86fca471163c275e8bb72521e3d5 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 29 May 2025 17:02:29 -0600 Subject: [PATCH 40/45] Add decoding test --- xarray/coding/common.py | 3 +++ xarray/tests/test_async.py | 1 + 2 files changed, 4 insertions(+) diff --git a/xarray/coding/common.py b/xarray/coding/common.py index 1b455009668..8093827138b 100644 --- a/xarray/coding/common.py +++ b/xarray/coding/common.py @@ -75,6 +75,9 @@ def __getitem__(self, key): def get_duck_array(self): return self.func(self.array.get_duck_array()) + async def async_get_duck_array(self): + return self.func(await self.array.async_get_duck_array()) + def __repr__(self) -> str: return f"{type(self).__name__}({self.array!r}, func={self.func!r}, dtype={self.dtype!r})" diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 99f4619e736..d2beb353123 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -86,6 +86,7 @@ def memorystore() -> "MemoryStore": chunks=(5, 5), dtype="f4", dimension_names=["x", "y"], + attributes={"add_offset": 1, "scale_factor": 2}, ) z1[:, :] = np.random.random((10, 10)) From 6a136118efcab55c5a88203a618a62c5b6652314 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 29 May 2025 15:33:27 -0600 Subject: [PATCH 41/45] Add IndexingAdapter mixin --- xarray/backends/zarr.py | 1 + xarray/coding/variables.py | 1 + xarray/core/indexing.py | 121 +++++++++++++++------------------- xarray/namedarray/pycompat.py | 17 ++--- 4 files changed, 61 insertions(+), 79 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 9c3d0dc7d63..8f814c7f1f3 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -242,6 +242,7 @@ def __getitem__(self, key): # could possibly have a work-around for 0d data here async def async_getitem(self, key): + print("async getting") array = self._array if isinstance(key, indexing.BasicIndexer): method = self._async_getitem diff --git a/xarray/coding/variables.py b/xarray/coding/variables.py index 1b7bc95e2b4..911c532f7bd 100644 --- a/xarray/coding/variables.py +++ b/xarray/coding/variables.py @@ -105,6 +105,7 @@ def __getitem__(self, key) -> np.ndarray: return np.asarray(self.array[key], dtype=self.dtype) + def _apply_mask( data: np.ndarray, encoded_fill_values: list, diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 1198f835789..824558010e1 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -516,16 +516,30 @@ def get_duck_array(self): return self.array -class ExplicitlyIndexedNDArrayMixin(NDArrayMixin, ExplicitlyIndexed): - __slots__ = () +class IndexingAdapter: + """Marker class for indexing adapters. + + These classes translate between Xarray's indexing semantics and the underlying array's + indexing semantics. + """ def get_duck_array(self): key = BasicIndexer((slice(None),) * self.ndim) return self[key] async def async_get_duck_array(self): - key = BasicIndexer((slice(None),) * self.ndim) - return await self.async_getitem(key) + """These classes are applied to in-memory arrays, so specific async support isn't needed.""" + return self.get_duck_array() + + +class ExplicitlyIndexedNDArrayMixin(NDArrayMixin, ExplicitlyIndexed): + __slots__ = () + + def get_duck_array(self): + raise NotImplementedError + + async def async_get_duck_array(self): + raise NotImplementedError def _oindex_get(self, indexer: OuterIndexer): raise NotImplementedError( @@ -650,37 +664,25 @@ def shape(self) -> _Shape: return self._shape def get_duck_array(self): - if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): - array = apply_indexer(self.array, self.key) - else: - # If the array is not an ExplicitlyIndexedNDArrayMixin, - # it may wrap a BackendArray so use its __getitem__ - array = self.array[self.key] + from xarray.backends.common import BackendArray - # self.array[self.key] is now a numpy array when - # self.array is a BackendArray subclass - # and self.key is BasicIndexer((slice(None, None, None),)) - # so we need the explicit check for ExplicitlyIndexed - if isinstance(array, ExplicitlyIndexed): - array = array.get_duck_array() + if isinstance(self.array, BackendArray): + array = self.array[self.key] + else: + array = apply_indexer(self.array, self.key) + if isinstance(array, ExplicitlyIndexed): + array = array.get_duck_array() return _wrap_numpy_scalars(array) async def async_get_duck_array(self): - if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): - array = apply_indexer(self.array, self.key) - else: - # If the array is not an ExplicitlyIndexedNDArrayMixin, - # it may wrap a BackendArray so use its (async) getitem - array = await self.array.async_getitem(self.key) + from xarray.backends.common import BackendArray - # self.array[self.key] is now a numpy array when - # self.array is a BackendArray subclass - # and self.key is BasicIndexer((slice(None, None, None),)) - # so we need the explicit check for ExplicitlyIndexed - if isinstance(array, ExplicitlyIndexed): - # At this point, we have issued completed the possible async load from disk - # and array is in-memory. So use the sync get - array = array.get_duck_array() + if isinstance(self.array, BackendArray): + array = await self.array.async_getitem(self.key) + else: + array = apply_indexer(self.array, self.key) + if isinstance(array, ExplicitlyIndexed): + array = await array.async_get_duck_array() return _wrap_numpy_scalars(array) def transpose(self, order): @@ -744,36 +746,26 @@ def shape(self) -> _Shape: return np.broadcast(*self.key.tuple).shape def get_duck_array(self): - if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): - array = apply_indexer(self.array, self.key) - else: - # If the array is not an ExplicitlyIndexedNDArrayMixin, - # it may wrap a BackendArray so use its __getitem__ + from xarray.backends.common import BackendArray + + if isinstance(self.array, BackendArray): array = self.array[self.key] - # self.array[self.key] is now a numpy array when - # self.array is a BackendArray subclass - # and self.key is BasicIndexer((slice(None, None, None),)) - # so we need the explicit check for ExplicitlyIndexed - if isinstance(array, ExplicitlyIndexed): - array = array.get_duck_array() + else: + array = apply_indexer(self.array, self.key) + if isinstance(array, ExplicitlyIndexed): + array = array.get_duck_array() return _wrap_numpy_scalars(array) async def async_get_duck_array(self): print("inside LazilyVectorizedIndexedArray.async_get_duck_array") - if isinstance(self.array, ExplicitlyIndexedNDArrayMixin): - array = apply_indexer(self.array, self.key) - else: - # If the array is not an ExplicitlyIndexedNDArrayMixin, - # it may wrap a BackendArray so use its __getitem__ + from xarray.backends.common import BackendArray + + if isinstance(self.array, BackendArray): array = await self.array.async_getitem(self.key) - # self.array[self.key] is now a numpy array when - # self.array is a BackendArray subclass - # and self.key is BasicIndexer((slice(None, None, None),)) - # so we need the explicit check for ExplicitlyIndexed - if isinstance(array, ExplicitlyIndexed): - # At this point, we have issued completed the possible async load from disk - # and array is in-memory. So use the sync get - array = array.get_duck_array() + else: + array = apply_indexer(self.array, self.key) + if isinstance(array, ExplicitlyIndexed): + array = await array.async_get_duck_array() return _wrap_numpy_scalars(array) def _updated_key(self, new_key: ExplicitIndexer): @@ -1589,7 +1581,7 @@ def is_fancy_indexer(indexer: Any) -> bool: return True -class NumpyIndexingAdapter(ExplicitlyIndexedNDArrayMixin): +class NumpyIndexingAdapter(IndexingAdapter, ExplicitlyIndexedNDArrayMixin): """Wrap a NumPy array to use explicit indexing.""" __slots__ = ("array",) @@ -1668,7 +1660,7 @@ def __init__(self, array): self.array = array -class ArrayApiIndexingAdapter(ExplicitlyIndexedNDArrayMixin): +class ArrayApiIndexingAdapter(IndexingAdapter, ExplicitlyIndexedNDArrayMixin): """Wrap an array API array to use explicit indexing.""" __slots__ = ("array",) @@ -1733,7 +1725,7 @@ def _assert_not_chunked_indexer(idxr: tuple[Any, ...]) -> None: ) -class DaskIndexingAdapter(ExplicitlyIndexedNDArrayMixin): +class DaskIndexingAdapter(IndexingAdapter, ExplicitlyIndexedNDArrayMixin): """Wrap a dask array to support explicit indexing.""" __slots__ = ("array",) @@ -1809,7 +1801,7 @@ def transpose(self, order): return self.array.transpose(order) -class PandasIndexingAdapter(ExplicitlyIndexedNDArrayMixin): +class PandasIndexingAdapter(IndexingAdapter, ExplicitlyIndexedNDArrayMixin): """Wrap a pandas.Index to preserve dtypes and handle explicit indexing.""" __slots__ = ("_dtype", "array") @@ -1872,15 +1864,6 @@ def get_duck_array(self) -> np.ndarray | PandasExtensionArray: return PandasExtensionArray(self.array.array) return np.asarray(self) - async def async_get_duck_array(self) -> np.ndarray | PandasExtensionArray: - # TODO this must surely be wrong - it's not async yet - print("in PandasIndexingAdapter") - if pd.api.types.is_extension_array_dtype(self.array): - from xarray.core.extension_array import PandasExtensionArray - - return PandasExtensionArray(self.array.array) - return np.asarray(self) - @property def shape(self) -> _Shape: return (len(self.array),) @@ -2135,7 +2118,9 @@ def copy(self, deep: bool = True) -> Self: return type(self)(array, self._dtype, self.level) -class CoordinateTransformIndexingAdapter(ExplicitlyIndexedNDArrayMixin): +class CoordinateTransformIndexingAdapter( + IndexingAdapter, ExplicitlyIndexedNDArrayMixin +): """Wrap a CoordinateTransform as a lazy coordinate array. Supports explicit indexing (both outer and vectorized). diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index c6a07e5963f..6e61d3445ab 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -153,20 +153,15 @@ async def async_to_duck_array( from xarray.core.indexing import ( ExplicitlyIndexed, ImplicitToExplicitIndexingAdapter, + IndexingAdapter, ) - from xarray.namedarray.parallelcompat import get_chunked_array_type print(type(data)) - - if is_chunked_array(data): - chunkmanager = get_chunked_array_type(data) - loaded_data, *_ = chunkmanager.compute(data, **kwargs) # type: ignore[var-annotated] - return loaded_data - - if isinstance(data, ExplicitlyIndexed | ImplicitToExplicitIndexingAdapter): + if isinstance(data, IndexingAdapter): + # These wrap in-memory arrays, and async isn't needed + return data.get_duck_array() + elif isinstance(data, ExplicitlyIndexed | ImplicitToExplicitIndexingAdapter): print("async inside to_duck_array") return await data.async_get_duck_array() # type: ignore[no-untyped-call, no-any-return] - elif is_duck_array(data): - return data else: - return np.asarray(data) # type: ignore[return-value] + return to_duck_array(data, **kwargs) From d79ed54724644419758226f795bf4ac1bedd7233 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 29 May 2025 21:58:52 -0600 Subject: [PATCH 42/45] [cherry] Making decoding arrays lazy too --- xarray/coding/strings.py | 9 ++++++--- xarray/coding/variables.py | 17 +++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/xarray/coding/strings.py b/xarray/coding/strings.py index 4ca6a3f0a46..a2295c218a6 100644 --- a/xarray/coding/strings.py +++ b/xarray/coding/strings.py @@ -250,14 +250,17 @@ def __repr__(self): return f"{type(self).__name__}({self.array!r})" def _vindex_get(self, key): - return _numpy_char_to_bytes(self.array.vindex[key]) + return type(self)(self.array.vindex[key]) def _oindex_get(self, key): - return _numpy_char_to_bytes(self.array.oindex[key]) + return type(self)(self.array.oindex[key]) def __getitem__(self, key): # require slicing the last dimension completely key = type(key)(indexing.expanded_indexer(key.tuple, self.array.ndim)) if key.tuple[-1] != slice(None): raise IndexError("too many indices") - return _numpy_char_to_bytes(self.array[key]) + return type(self)(self.array[key]) + + def get_duck_array(self): + return _numpy_char_to_bytes(self.array.get_duck_array()) diff --git a/xarray/coding/variables.py b/xarray/coding/variables.py index 911c532f7bd..f82f0c65768 100644 --- a/xarray/coding/variables.py +++ b/xarray/coding/variables.py @@ -58,13 +58,16 @@ def dtype(self) -> np.dtype: return np.dtype(self.array.dtype.kind + str(self.array.dtype.itemsize)) def _oindex_get(self, key): - return np.asarray(self.array.oindex[key], dtype=self.dtype) + return type(self)(self.array.oindex[key]) def _vindex_get(self, key): - return np.asarray(self.array.vindex[key], dtype=self.dtype) + return type(self)(self.array.vindex[key]) def __getitem__(self, key) -> np.ndarray: - return np.asarray(self.array[key], dtype=self.dtype) + return type(self)(self.array[key]) + + def get_duck_array(self): + return duck_array_ops.astype(self.array.get_duck_array(), dtype=self.dtype) class BoolTypeArray(indexing.ExplicitlyIndexedNDArrayMixin): @@ -96,14 +99,16 @@ def dtype(self) -> np.dtype: return np.dtype("bool") def _oindex_get(self, key): - return np.asarray(self.array.oindex[key], dtype=self.dtype) + return type(self)(self.array.oindex[key]) def _vindex_get(self, key): - return np.asarray(self.array.vindex[key], dtype=self.dtype) + return type(self)(self.array.vindex[key]) def __getitem__(self, key) -> np.ndarray: - return np.asarray(self.array[key], dtype=self.dtype) + return type(self)(self.array[key]) + def get_duck_array(self): + return duck_array_ops.astype(self.array.get_duck_array(), dtype=self.dtype) def _apply_mask( From 1da335991fc1c86c1a79f706402c3f762ea15e04 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 30 May 2025 21:27:05 +0700 Subject: [PATCH 43/45] parametrized over isel and sel --- xarray/tests/test_async.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index d2beb353123..ff6998de51c 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -179,19 +179,17 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: total_time=timer.total_time, latency=self.LATENCY, n_loads=N_OBJECTS ) + @pytest.mark.parametrize("method", ["sel", "isel"]) @pytest.mark.parametrize( - "method,indexer", + "indexer", [ - ("sel", {"x": 2}), - ("sel", {"x": slice(2, 4)}), - ("sel", {"x": [2, 3]}), - ( - "sel", - { - "x": xr.DataArray([2, 3], dims="points"), - "y": xr.DataArray([2, 3], dims="points"), - }, - ), + {"x": 2}, + {"x": slice(2, 4)}, + {"x": [2, 3]}, + { + "x": xr.DataArray([2, 3], dims="points"), + "y": xr.DataArray([2, 3], dims="points"), + }, ], ids=["basic-int", "basic-slice", "outer", "vectorized"], ) From dded9e04ba2abdb01a1923afd450f8ae23ebe08b Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 30 May 2025 22:50:27 +0700 Subject: [PATCH 44/45] mock zarr.AsyncArray.getitem in test --- xarray/tests/test_async.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index ff6998de51c..9def990f353 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -3,6 +3,7 @@ from collections.abc import Iterable from contextlib import asynccontextmanager from typing import TypeVar +from unittest.mock import patch import numpy as np import pytest @@ -196,9 +197,27 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: async def test_indexing(self, memorystore, method, indexer) -> None: # TODO we don't need a LatencyStore for this test latencystore = LatencyStore(memorystore, latency=0.0) - ds = xr.open_zarr(latencystore, zarr_format=3, consolidated=False, chunks=None) - # TODO we're not actually testing that these indexing methods are not blocking... - result = await getattr(ds, method)(**indexer).load_async() + original_getitem = zarr.AsyncArray.getitem + + async def wrapper(instance, selection): + # Call the original method with proper self + result = await original_getitem(instance, selection) + return result + + with patch.object( + zarr.AsyncArray, "getitem", side_effect=wrapper, autospec=True + ) as mocked_meth: + ds = xr.open_zarr( + latencystore, zarr_format=3, consolidated=False, chunks=None + ) + + # TODO we're not actually testing that these indexing methods are not blocking... + result = await getattr(ds, method)(**indexer).load_async() + + assert mocked_meth.call_count > 0 + mocked_meth.assert_called() + mocked_meth.assert_awaited() + expected = getattr(ds, method)(**indexer).load() xrt.assert_identical(result, expected) From 4c347ad0a2225214f8932afd926dfbf53f51bd06 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Fri, 30 May 2025 23:06:27 +0700 Subject: [PATCH 45/45] tidy up the mocking --- xarray/tests/test_async.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/xarray/tests/test_async.py b/xarray/tests/test_async.py index 9def990f353..918a2508ea0 100644 --- a/xarray/tests/test_async.py +++ b/xarray/tests/test_async.py @@ -182,31 +182,29 @@ async def test_concurrent_load_multiple_objects(self, xr_obj) -> None: @pytest.mark.parametrize("method", ["sel", "isel"]) @pytest.mark.parametrize( - "indexer", + "indexer, zarr_getitem_method", [ - {"x": 2}, - {"x": slice(2, 4)}, - {"x": [2, 3]}, - { - "x": xr.DataArray([2, 3], dims="points"), - "y": xr.DataArray([2, 3], dims="points"), - }, + ({"x": 2}, zarr.AsyncArray.getitem), + ({"x": slice(2, 4)}, zarr.AsyncArray.getitem), + ({"x": [2, 3]}, zarr.core.indexing.AsyncOIndex.getitem), + ( + { + "x": xr.DataArray([2, 3], dims="points"), + "y": xr.DataArray([2, 3], dims="points"), + }, + zarr.core.indexing.AsyncVIndex.getitem, + ), ], ids=["basic-int", "basic-slice", "outer", "vectorized"], ) - async def test_indexing(self, memorystore, method, indexer) -> None: + async def test_indexing( + self, memorystore, method, indexer, zarr_getitem_method + ) -> None: # TODO we don't need a LatencyStore for this test latencystore = LatencyStore(memorystore, latency=0.0) - original_getitem = zarr.AsyncArray.getitem - - async def wrapper(instance, selection): - # Call the original method with proper self - result = await original_getitem(instance, selection) - return result - with patch.object( - zarr.AsyncArray, "getitem", side_effect=wrapper, autospec=True + zarr.AsyncArray, "getitem", wraps=zarr_getitem_method, autospec=True ) as mocked_meth: ds = xr.open_zarr( latencystore, zarr_format=3, consolidated=False, chunks=None