Skip to content

Allow passing config to default_object_store #564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions virtualizarr/manifests/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pickle
from collections.abc import AsyncGenerator, Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypedDict
from urllib.parse import urlparse

from zarr.abc.store import (
Expand All @@ -26,7 +26,10 @@

import xarray as xr
from obstore.store import (
ObjectStore, # type: ignore[import-not-found]
ClientConfig,
ObjectStore,
RetryConfig,
S3Config,
)
from zarr.core.buffer import BufferPrototype
from zarr.core.common import BytesLike
Expand All @@ -41,6 +44,13 @@
)


class ObjectStoreOptions(TypedDict):
config: S3Config
client_options: ClientConfig
retry_config: RetryConfig
credential_provider: callable


@dataclass
class StoreRequest:
"""Dataclass for matching a key to the store instance"""
Expand Down Expand Up @@ -139,25 +149,34 @@ def _find_bucket_region(bucket_name: str) -> str:
return region


def default_object_store(filepath: str) -> ObjectStore:
def default_object_store(
filepath: str, storage_config: ObjectStoreOptions | None = None
) -> ObjectStore:
import obstore as obs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to replace this function with obs.store.from_url() but the upstream version doesn't seem to auto-infer the region. Am I missing anything @kylebarron? xref #561

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct that obstore.store.from_url does not auto-infer the S3 region. This is an artifact of AWS-hosted S3 requiring the region but non-AWS-hosted S3-compatible stores not requiring the region. E.g. it supports r2.cloudflarestorage.com -style urls, but those don't have a region. So it's up to the user to pass in the region if required.

Copy link
Contributor

@kylebarron kylebarron Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's something I whipped together, which is a quick port of this file, so you can infer the region only for s3-native urls

from __future__ import annotations

from enum import Enum
from urllib.parse import urlparse

from obstore.store import S3Store


class ObjectStoreScheme(Enum):
    AZURE = "azure"
    FILE = "file"
    S3 = "s3"
    S3Like = "s3like"
    MEMORY = "memory"
    HTTP = "http"


def create_store(url_str: str, config: dict) -> ObjectStore:
    scheme, bucket, path = parse_url(url_str)
    if scheme == ObjectStoreScheme.S3:
        region = infer_region(...)
        return S3Store(...)
    elif scheme == ObjectStoreScheme.S3Like:
        # Don't infer region
        return S3Store(...)
    elif scheme == ObjectStoreScheme.AZURE:
        return AzureStore(...)
    elif scheme == ObjectStoreScheme.FILE:
        return LocalStore(...)
    elif scheme == ObjectStoreScheme.MEMORY:
        return MemoryStore(...)
    elif scheme == ObjectStoreScheme.HTTP:
        return HttpStore(...)
    else:
        raise ValueError(f"Unsupported URL scheme: {scheme}")


def parse_url(url_str: str) -> tuple[ObjectStoreScheme, str | None, str]:
    # scheme, bucket, path
    url = urlparse(url_str)
    if url.scheme == "file":
        return ObjectStoreScheme.FILE, None, url.path

    if url.scheme == "memory":
        return ObjectStoreScheme.MEMORY, None, url.path

    if url.scheme in ["s3", "s3a"]:
        assert url.netloc, f"Expected bucket in s3:// url, got: {url_str}"
        return ObjectStoreScheme.S3, url.netloc, url.path

    if url.scheme == "gs":
        assert url.netloc, f"Expected bucket in gs:// url, got: {url_str}"
        return ObjectStoreScheme.S3Like, url.netloc, url.path

    if url.scheme in ["az", "adl", "azure", "abfs", "abfss"]:
        assert url.netloc, f"Expected bucket in azure url, got: {url_str}"
        return ObjectStoreScheme.AZURE, url.netloc, url.path

    if url.scheme == "http":
        return ObjectStoreScheme.HTTP, None, url.path

    if url.scheme == "https":
        if url.netloc.endswith("amazonaws.com"):
            if url.netloc.startswith("s3"):
                region = url.netloc.split(".", maxsplit=2)[1]
                # TODO: return region from this fn
                bucket, path = url.path.removeprefix("/").split("/", maxsplit=1)
                return ObjectStoreScheme.S3, bucket, path
            else:
                return ObjectStoreScheme.S3, None, url.path
        if url.netloc.endswith("r2.cloudflarestorage.com"):
            return ObjectStoreScheme.S3Like, None, url.path
        if url.netloc.endswith("blob.core.windows.net") or url.netloc.endswith(
            "dfs.core.windows.net"
        ):
            return ObjectStoreScheme.AZURE, None, url.path

        return ObjectStoreScheme.HTTP, None, url.path

    raise ValueError("Unrecognized url")

Perhaps you'd want create_store to take in separate config dicts for better typing and so you can define configs for multiple different stores at once. Something like

def create_store(
    url_str: str,
    s3_config: S3Config | None = None,
    azure_config: AzureConfig | None = None,
    gcs_config: GCSConfig | None = None,
) -> ObjectStore: ...

And then you can only infer the region if it's not manually passed in that s3_config dict


storage_config = storage_config or ObjectStoreOptions(
client_options={"allow_http": True},
config={"skip_signature": True, "virtual_hosted_style_request": True},
)
parsed = urlparse(filepath)

if parsed.scheme in ["", "file"]:
return obs.store.LocalStore()
if parsed.scheme == "s3":
bucket = parsed.netloc
return obs.store.S3Store(
bucket=bucket,
storage_config = storage_config or ObjectStoreOptions(
client_options={"allow_http": True},
skip_signature=True,
virtual_hosted_style_request=False,
region=_find_bucket_region(bucket),
config={
"skip_signature": True,
"virtual_hosted_style_request": True,
"bucket": parsed.netloc,
"region": _find_bucket_region(parsed.netloc),
},
)
return obs.store.S3Store(**storage_config)
if parsed.scheme in ["http", "https"]:
storage_config = storage_config or {}
base_url = f"{parsed.scheme}://{parsed.netloc}"
return obs.store.HTTPStore.from_url(base_url)
return obs.store.HTTPStore(base_url, **storage_config)
raise NotImplementedError(f"{parsed.scheme} is not yet supported")


Expand Down
29 changes: 17 additions & 12 deletions virtualizarr/readers/hdf/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
ManifestStore,
)
from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri
from virtualizarr.manifests.store import ObjectStoreRegistry, default_object_store
from virtualizarr.manifests.store import (
ObjectStoreOptions,
ObjectStoreRegistry,
default_object_store,
)
from virtualizarr.manifests.utils import create_v3_array_metadata
from virtualizarr.readers.api import VirtualBackend
from virtualizarr.readers.hdf.filters import codecs_from_dataset
Expand Down Expand Up @@ -119,7 +123,7 @@ def _construct_manifest_array(
@staticmethod
def _construct_manifest_group(
store: ObjectStore,
filepath: str,
uri: str,
*,
group: str | None = None,
drop_variables: Optional[Iterable[str]] = None,
Expand All @@ -132,7 +136,7 @@ def _construct_manifest_group(
if drop_variables is None:
drop_variables = []

reader = ObstoreReader(store=store, path=filepath)
reader = ObstoreReader(store=store, path=uri)
f = h5py.File(reader, mode="r")

if group is not None and group != "":
Expand All @@ -159,7 +163,7 @@ def _construct_manifest_group(
if key not in drop_variables:
if isinstance(g[key], h5py.Dataset):
variable = HDFVirtualBackend._construct_manifest_array(
path=filepath,
path=uri,
dataset=g[key],
group=group_name,
)
Expand All @@ -174,17 +178,19 @@ def _create_manifest_store(
store: ObjectStore | None = None,
group: str | None = None,
drop_variables: Iterable[str] | None = None,
storage_config: ObjectStoreOptions | None = None,
) -> ManifestStore:
# Create a group containing dataset level metadata and all the manifest arrays
uri = validate_and_normalize_path_to_uri(filepath, fs_root=Path.cwd().as_uri())
if not store:
store = default_object_store(filepath) # type: ignore
store = default_object_store(uri, storage_config=storage_config) # type: ignore
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=filepath,
uri=uri,
group=group,
drop_variables=drop_variables,
)
registry = ObjectStoreRegistry({filepath: store})
registry = ObjectStoreRegistry({uri: store})
# Convert to a manifest store
return ManifestStore(store_registry=registry, group=manifest_group)

Expand All @@ -197,7 +203,7 @@ def open_virtual_dataset(
decode_times: bool | None = None,
indexes: Mapping[str, xr.Index] | None = None,
virtual_backend_kwargs: Optional[dict] = None,
reader_options: Optional[dict] = None,
reader_options: Optional[ObjectStoreOptions] = None,
) -> xr.Dataset:
if h5py is None:
raise ImportError("h5py is required for using the HDFVirtualBackend")
Expand All @@ -206,18 +212,17 @@ def open_virtual_dataset(
"HDF reader does not understand any virtual_backend_kwargs"
)

filepath = validate_and_normalize_path_to_uri(
filepath, fs_root=Path.cwd().as_uri()
)
uri = validate_and_normalize_path_to_uri(filepath, fs_root=Path.cwd().as_uri())

_drop_vars: Iterable[str] = (
[] if drop_variables is None else list(drop_variables)
)

manifest_store = HDFVirtualBackend._create_manifest_store(
filepath=filepath,
filepath=uri,
drop_variables=_drop_vars,
group=group,
storage_config=reader_options,
)
ds = manifest_store.to_virtual_dataset(
loadable_variables=loadable_variables,
Expand Down
10 changes: 5 additions & 5 deletions virtualizarr/tests/test_readers/test_hdf/test_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ def test_variable_with_dimensions(self, chunked_dimensions_netcdf4_file):
store = LocalStore()
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=chunked_dimensions_netcdf4_file,
uri=chunked_dimensions_netcdf4_file,
)
assert len(manifest_group.arrays) == 3

def test_nested_groups_are_ignored(self, nested_group_hdf5_file):
store = LocalStore()
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=nested_group_hdf5_file,
uri=nested_group_hdf5_file,
group="group",
)
assert len(manifest_group.arrays) == 1
Expand All @@ -186,7 +186,7 @@ def test_drop_variables(self, multiple_datasets_hdf5_file):
store = LocalStore()
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=multiple_datasets_hdf5_file,
uri=multiple_datasets_hdf5_file,
drop_variables=["data2"],
)
assert "data2" not in manifest_group.arrays.keys()
Expand All @@ -195,7 +195,7 @@ def test_dataset_in_group(self, group_hdf5_file):
store = LocalStore()
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=group_hdf5_file,
uri=group_hdf5_file,
group="group",
)
assert len(manifest_group.arrays) == 1
Expand All @@ -205,7 +205,7 @@ def test_non_group_error(self, group_hdf5_file):
with pytest.raises(ValueError):
HDFVirtualBackend._construct_manifest_group(
store=store,
filepath=group_hdf5_file,
uri=group_hdf5_file,
group="group/data",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ def test_store(self, minio_bucket, chunked_roundtrip_hdf5_s3_file):
assert vds.dims == {"phony_dim_0": 5}
assert isinstance(vds["data"].data, ManifestArray)

@requires_minio
@requires_obstore
def test_store_options(self, minio_bucket, chunked_roundtrip_hdf5_s3_file):
config = {
"bucket": minio_bucket["bucket"],
"endpoint": minio_bucket["endpoint"],
"virtual_hosted_style_request": False,
"skip_signature": True,
"client_options": {"allow_http": True},
}
store = HDFVirtualBackend._create_manifest_store(
filepath=chunked_roundtrip_hdf5_s3_file,
storage_config=config,
)
vds = store.to_virtual_dataset()
assert vds.dims == {"phony_dim_0": 5}
assert isinstance(vds["data"].data, ManifestArray)

@requires_obstore
def test_default_store(self):
store = HDFVirtualBackend._create_manifest_store(
Expand Down
Loading