diff --git a/dascore/__init__.py b/dascore/__init__.py index 1675b73e..6f5a3aaf 100644 --- a/dascore/__init__.py +++ b/dascore/__init__.py @@ -12,6 +12,7 @@ from dascore.units import get_quantity, get_unit from dascore.utils.patch import patch_function from dascore.utils.time import to_datetime64, to_timedelta64, to_float +from dascore.utils.fs import UPath from dascore.version import __last_version__, __version__ # flag for disabling progress bar when debugging diff --git a/dascore/core/coordmanager.py b/dascore/core/coordmanager.py index af163d0e..3ec3bfd2 100644 --- a/dascore/core/coordmanager.py +++ b/dascore/core/coordmanager.py @@ -1211,11 +1211,12 @@ def _maybe_coord_from_nested(name, coord, new_dims): return c_map, d_map, dims -class CoordManagerSummary(CoordManager): +class CoordManagerSummary(DascoreBaseModel): """A coordinate manager with summary coordinates.""" + dims: tuple[str, ...] coord_map: Annotated[ - FrozenDict[str, CoordSummary], + FrozenDict[str, BaseCoord], frozen_dict_validator, frozen_dict_serializer, ] @@ -1226,11 +1227,14 @@ def to_coord_manager(self): This only works if the coordinates were evenly sampled/sorted. """ - out = {} + coord_map = {} + dim_map = {} for name, coord in self.coord_map.items(): - out[name] = coord.to_coord() + dim_map[name] = coord.dims + coord_map[name] = coord.to_coord() + return CoordManager( - coord_map=out, - dim_map=self.dim_map, + coord_map=FrozenDict(coord_map), + dim_map=FrozenDict(dim_map), dims=self.dims, ) diff --git a/dascore/core/patch.py b/dascore/core/patch.py index 721be2c2..db3b4dca 100644 --- a/dascore/core/patch.py +++ b/dascore/core/patch.py @@ -405,6 +405,17 @@ class PatchSummary(DascoreBaseModel): attrs: PatchAttrs coords: CoordManagerSummary + _attrs_to_patch_keys = ( + "data_type", + "dtype", + "data_units", + "path", + "format_version", + "format_name", + "acquistion_id", + "tag", + ) + def to_summary( self, path=None, @@ -417,3 +428,37 @@ def to_summary( This is here to be compatible with Patch.to_summary. """ return self + + def _attrs_to_patch_info(self, attr_info, patch_info, patch_id): + """Transfer some attrs to the patch info.""" + out = [] + for key in self._attrs_to_patch_keys: + if value := attr_info.pop(key, None): + patch_info[key] = value + # flatten remaining attrs + for item, value in attr_info.items(): + out.append(dict(name=item, value=value, patch_id=patch_id)) + return out + + def _reshape_coords(self, patch_info, coord_info, patch_key): + """Move some coord info over to patch info.""" + patch_info["dims"] = coord_info.pop("dims") + return list(coord_info["coord_map"].values()) + + def to_patch_coords_attrs_info( + self, + patch_key, + ) -> tuple[list[dict], list[dict], list[dict]]: + """ + Convert the PatchSummary to three lists of dicts. + + The lists are for patch info, coord info, and attr info. + """ + attrs = self.attrs.model_dump(exclude_unset=True) + coords = self.coords.model_dump(exclude_unset=True) + patch_info = self.data.model_dump(exclude_unset=True) + + patch_info["patch_key"] = patch_key + attrs = self._attrs_to_patch_info(attrs, patch_info, patch_key) + coords = self._reshape_coords(patch_info, coords, patch_key) + return patch_info, attrs, coords diff --git a/dascore/io/ap_sensing/utils.py b/dascore/io/ap_sensing/utils.py index 072fb432..eb712adb 100644 --- a/dascore/io/ap_sensing/utils.py +++ b/dascore/io/ap_sensing/utils.py @@ -4,7 +4,7 @@ import dascore as dc from dascore.core import get_coord, get_coord_manager -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.misc import _maybe_unpack, unbyte @@ -84,7 +84,7 @@ def _get_attrs_dict(resource, format_name): instrumet_id=unbyte(_maybe_unpack(daq["SerialNumber"])), gauge_length=_maybe_unpack(pserver["GaugeLength"]), radians_to_nano_strain=_maybe_unpack(pserver["RadiansToNanoStrain"]), - path=get_uri(resource), + path=get_path(resource), format_name=format_name, format_version=version, ) diff --git a/dascore/io/asn/utils.py b/dascore/io/asn/utils.py index f2ef5ea2..3394007f 100644 --- a/dascore/io/asn/utils.py +++ b/dascore/io/asn/utils.py @@ -5,7 +5,7 @@ import dascore as dc import dascore.core from dascore.core.coords import get_coord -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.hdf5 import unpack_scalar_h5_dataset from dascore.utils.misc import unbyte @@ -79,7 +79,7 @@ def _get_attr_dict(header, path, format_name, format_version): def _get_opto_das_coords_attrs(fi, format_name) -> tuple[dc.CoordManager, dict]: """Scan a OptoDAS file, return metadata.""" cm = _get_coord_manager(fi) - path = get_uri(fi) + path = get_path(fi) version = _get_opto_das_version_str(fi) attrs = _get_attr_dict(fi["header"], path, format_name, version) return cm, attrs diff --git a/dascore/io/core.py b/dascore/io/core.py index 803d988d..de6beaf3 100644 --- a/dascore/io/core.py +++ b/dascore/io/core.py @@ -36,6 +36,7 @@ MissingOptionalDependencyError, UnknownFiberFormatError, ) +from dascore.utils.fs import UPath, iter_path_contents from dascore.utils.io import IOResourceManager, get_handle_from_resource from dascore.utils.mapping import FrozenDict from dascore.utils.misc import cached_method, iterate, warn_or_raise @@ -47,7 +48,6 @@ ) from dascore.utils.pd import _model_list_to_df from dascore.utils.progress import track -from dascore.utils.fs import FSPath class PatchFileSummary(DascoreBaseModel): @@ -691,8 +691,8 @@ def scan_to_df( return path.get_contents() info = scan( path=path, - file_format=file_format, - file_version=file_version, + resource_format=file_format, + resource_version=file_version, ext=ext, timestamp=timestamp, progress=progress, @@ -705,7 +705,7 @@ def _iterate_scan_inputs(patch_source, ext, mtime, include_directories=True, **k """Yield scan candidates.""" for el in iterate(patch_source): if isinstance(el, str | Path) and (path := Path(el)).exists(): - generator = _iter_filesystem( + generator = iter_path_contents( path, ext=ext, timestamp=mtime, include_directories=include_directories ) yield from generator @@ -781,9 +781,9 @@ def _handle_missing_optionals(outputs, optional_dep_dict): def scan( - path: Path | FSPath | str | PatchType | SpoolType | IOResourceManager, - file_format: str | None = None, - file_version: str | None = None, + path: UPath | Path | str | PatchType | SpoolType | IOResourceManager, + resource_format: str | None = None, + resource_version: str | None = None, ext: str | None = None, timestamp: float | None = None, progress: PROGRESS_LEVELS = "standard", @@ -795,10 +795,10 @@ def scan( ---------- path A resource containing Fiber data. - file_format + resource_format Format of the file. If not provided DASCore will try to determine it. Only applicable for path-like inputs. - file_version + resource_version Version of the file. If not provided DASCore will try to determine it. Only applicable for path-like inputs. ext : str or None @@ -823,9 +823,9 @@ def scan( >>> # Replace with your file path. >>> file_path = fetch("prodml_2.1.h5") >>> - >>> attr_list = dc.scan(file_path) + >>> summary_list = dc.scan(file_path) - See also [`FSPath`](`dascore.utils.fs.FSPath`) + See also [`PatchSummary`](`dascore.core.patch.PatchSummary`) """ out = [] fiber_io_hint: dict[str, FiberIO] = {} @@ -848,19 +848,19 @@ def scan( for patch_source in tracker: # just pull attrs from patch if isinstance(patch_source, dc.Patch): - out.append(patch_source.attrs) + out.append(patch_source.to_summary()) continue with IOResourceManager(patch_source) as man: try: fiber_io, resource = _get_fiber_io_and_req_type( man, - file_format=file_format, - file_version=file_version, + file_format=resource_format, + file_version=resource_version, fiber_io_hint=fiber_io_hint, ) except UnknownFiberFormatError: # skip bad entities continue - # Cache this fiber io to given preferential treatment next iteration. + # Cache this fiber io to give preferential treatment next iteration. # This speeds up the common case of many files with the same format. fiber_io_hint[fiber_io.input_type] = fiber_io # Special handling of directory FiberIOs. @@ -882,8 +882,8 @@ def scan( except MissingOptionalDependencyError as ex: missing_optional_deps[ex.msg.split(" ")[0]] += 1 continue - for attr in source: - out.append(dc.PatchAttrs.from_dict(attr)) + for summary in source: + out.append(dc.PatchSummary.model_validate(summary)) if missing_optional_deps: _handle_missing_optionals(out, missing_optional_deps) return out diff --git a/dascore/io/dasdae/core.py b/dascore/io/dasdae/core.py index 787ba7bc..92c2311e 100644 --- a/dascore/io/dasdae/core.py +++ b/dascore/io/dasdae/core.py @@ -5,6 +5,7 @@ import dascore as dc from dascore.constants import SpoolType from dascore.io import FiberIO +from dascore.utils.fs import get_path from dascore.utils.hdf5 import ( H5Reader, H5Writer, @@ -12,7 +13,6 @@ from dascore.utils.misc import unbyte from dascore.utils.patch import get_patch_names -from ...utils.fs import get_uri from .utils import ( _get_summary_from_patch_groups, _read_patch, @@ -89,7 +89,7 @@ def get_format(self, resource: H5Reader, **kwargs) -> tuple[str, str] | bool: def read(self, resource: H5Reader, **kwargs) -> SpoolType: """Read a DASDAE file.""" patches = [] - path = get_uri(resource) + path = get_path(resource) format_version = unbyte(resource.attrs["__DASDAE_version__"]) format_name = self.name try: diff --git a/dascore/io/dasdae/utils.py b/dascore/io/dasdae/utils.py index e5fabc31..66c3ca64 100644 --- a/dascore/io/dasdae/utils.py +++ b/dascore/io/dasdae/utils.py @@ -8,7 +8,7 @@ import dascore as dc from dascore.core.coordmanager import get_coord_manager from dascore.core.coords import get_coord -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.hdf5 import Empty from dascore.utils.misc import suppress_warnings, unbyte from dascore.utils.time import to_int @@ -190,7 +190,7 @@ def _read_patch(patch_group, path, format_name, format_version, **kwargs): def _get_summary_from_patch_groups(h5, format_name="DASDAE"): """Get the contents from each patch group.""" - path = get_uri(h5) + path = get_path(h5) format_version = h5.attrs["__DASDAE_version__"] out = [] for name, group in h5[("/waveforms")].items(): diff --git a/dascore/io/dashdf5/core.py b/dascore/io/dashdf5/core.py index 5f51bf87..c675201e 100644 --- a/dascore/io/dashdf5/core.py +++ b/dascore/io/dashdf5/core.py @@ -7,7 +7,7 @@ from dascore.io import FiberIO from dascore.utils.hdf5 import H5Reader -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import _get_cf_attrs, _get_cf_coords, _get_cf_version_str @@ -21,7 +21,7 @@ class DASHDF5(FiberIO): def _get_attr(self, resource: H5Reader): """Get the attrs dict with path and such populated.""" attrs = _get_cf_attrs(resource) - attrs["path"] = get_uri(resource) + attrs["path"] = get_path(resource) attrs["format_name"] = self.name attrs["format_version"] = self.version return dc.PatchAttrs.model_validate(attrs) diff --git a/dascore/io/febus/core.py b/dascore/io/febus/core.py index 7b41513d..4ae6a59a 100644 --- a/dascore/io/febus/core.py +++ b/dascore/io/febus/core.py @@ -12,7 +12,7 @@ from dascore.utils.hdf5 import H5Reader from dascore.utils.models import UTF8Str -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import ( _get_febus_version_str, _read_febus, @@ -71,7 +71,7 @@ def scan(self, resource: H5Reader, **kwargs) -> list[dc.PatchSummary]: """Scan a febus file, return summary information about the file's contents.""" return _scan_febus( resource, - path=get_uri(resource), + path=get_path(resource), format_name=self.name, format_version=self.version, attr_cls=FebusPatchAttrs, @@ -87,7 +87,7 @@ def read( """Read a febus spool of patches.""" patches = _read_febus( resource, - path=get_uri(resource), + path=get_path(resource), format_name=self.name, format_version=self.version, time=time, diff --git a/dascore/io/gdr/core.py b/dascore/io/gdr/core.py index 5e2619e0..2ace79eb 100644 --- a/dascore/io/gdr/core.py +++ b/dascore/io/gdr/core.py @@ -16,7 +16,7 @@ _get_version, _maybe_trim_data, ) -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.hdf5 import H5Reader @@ -40,7 +40,7 @@ class GDR_V1(FiberIO): # noqa def _get_attr_coord_data(self, resource, snap=True): """Get the attributes, coordinates, and h5 dataset.""" attr_dict, cm, data = _get_attrs_coords_and_data(resource, snap=snap) - attr_dict["path"] = get_uri(resource) + attr_dict["path"] = get_path(resource) attr_dict["format_name"] = self.name attr_dict["version"] = self.version attr = GDRPatchAttrs(**attr_dict) diff --git a/dascore/io/neubrex/core.py b/dascore/io/neubrex/core.py index 61ab87ab..21771ee0 100644 --- a/dascore/io/neubrex/core.py +++ b/dascore/io/neubrex/core.py @@ -11,7 +11,7 @@ import dascore.io.neubrex.utils_rfs as rfs_utils from dascore.constants import SpoolType from dascore.io import FiberIO -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.hdf5 import H5Reader @@ -53,7 +53,7 @@ class NeubrexRFSV1(FiberIO): def _get_attrs(self, resource) -> NeubrexRFSPatchAttrs: """Get the patch attributes.""" attr = rfs_utils._get_attr_dict(resource) - attr["path"] = get_uri(resource) + attr["path"] = get_path(resource) attr["format_name"] = self.name attr["format_version"] = self.version return NeubrexRFSPatchAttrs(**attr) @@ -106,7 +106,7 @@ class NeubrexDASV1(FiberIO): def _get_attr(self, resource) -> NeubrexDASPatchAttrs: """Get the attrs for from the file.""" attr = das_utils._get_attr_dict(resource["Acoustic"]) - attr["path"] = get_uri(resource) + attr["path"] = get_path(resource) attr["format_name"] = self.name attr["format_version"] = self.version return NeubrexDASPatchAttrs(**attr) diff --git a/dascore/io/pickle/core.py b/dascore/io/pickle/core.py index 3ac4edef..36c12bbf 100644 --- a/dascore/io/pickle/core.py +++ b/dascore/io/pickle/core.py @@ -6,7 +6,7 @@ import dascore as dc from dascore.io import BinaryReader, BinaryWriter, FiberIO -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path class PickleIO(FiberIO): @@ -53,7 +53,7 @@ def read(self, resource: BinaryReader, **kwargs): patch: dc.Patch = pickle.load(resource) # Add the relevant file info. out = patch.update_attrs( - path=get_uri(resource), + path=get_path(resource), format_name=self.name, format_version=self.version, ) diff --git a/dascore/io/prodml/utils.py b/dascore/io/prodml/utils.py index dfad71d7..3fc19cae 100644 --- a/dascore/io/prodml/utils.py +++ b/dascore/io/prodml/utils.py @@ -6,7 +6,7 @@ from dascore.constants import VALID_DATA_TYPES from dascore.core.coordmanager import get_coord_manager from dascore.core.coords import get_coord -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.misc import iterate, maybe_get_items, unbyte # --- Getting format/version @@ -101,7 +101,7 @@ def _get_prodml_attrs(fi, format_name, format_version) -> list[dict]: acq = fi["Acquisition"] base_info = maybe_get_items(acq.attrs, _root_attrs) raw_nodes = _get_raw_node_dict(acq) - path = get_uri(fi) + path = get_path(fi) # Iterate each raw data node. I have only ever seen 1 in a file but since # it is indexed like Raw[0] there might be more. diff --git a/dascore/io/segy/core.py b/dascore/io/segy/core.py index 623eee6d..00541351 100644 --- a/dascore/io/segy/core.py +++ b/dascore/io/segy/core.py @@ -7,7 +7,7 @@ from dascore.utils.io import BinaryReader from dascore.utils.misc import optional_import -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import ( _get_coords, _get_data_summary, @@ -32,7 +32,7 @@ class SegyV1_0(FiberIO): # noqa def _get_attrs(self, resource): """Get the basic attributes for a segy file.""" info = dict( - path=get_uri(resource), + path=get_path(resource), format_name=self.name, format_version=self.version, ) diff --git a/dascore/io/sentek/core.py b/dascore/io/sentek/core.py index f72edb61..cb0249bf 100644 --- a/dascore/io/sentek/core.py +++ b/dascore/io/sentek/core.py @@ -9,7 +9,7 @@ from dascore.io.core import FiberIO from dascore.utils.models import ArraySummary -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import _get_patch_attrs, _get_version @@ -24,7 +24,7 @@ def _get_attrs_coords_offsets(self, resource): """Get attributes, coordinates, and data offsets from file.""" attrs_dict, coords, offsets = _get_patch_attrs( resource, - path=get_uri(resource), + path=get_path(resource), format_name=self.name, format_version=self.version, ) diff --git a/dascore/io/sentek/utils.py b/dascore/io/sentek/utils.py index d6ca661c..31a172e7 100644 --- a/dascore/io/sentek/utils.py +++ b/dascore/io/sentek/utils.py @@ -8,12 +8,12 @@ import dascore as dc from dascore.core import get_coord, get_coord_manager -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path def _get_version(fid): """Determine if Sentek file.""" - path = get_uri(fid) + path = get_path(fid) # Sentek files cannot change the extension, or file name. sw_data = path.endswith(".das") fid.seek(0) diff --git a/dascore/io/silixah5/core.py b/dascore/io/silixah5/core.py index 9fd298c4..6fd6ddee 100644 --- a/dascore/io/silixah5/core.py +++ b/dascore/io/silixah5/core.py @@ -10,7 +10,7 @@ import dascore.io.silixah5.utils as util from dascore.constants import opt_timeable_types from dascore.io import FiberIO -from dascore.utils.fs import get_uri +from dascore.utils.fs import get_path from dascore.utils.hdf5 import H5Reader @@ -33,7 +33,7 @@ class SilixaH5V1(FiberIO): def _get_attr_coords(self, resource): """Get attributes and coordinates of patch in file.""" info, coords = util._get_attr_dict(resource) - info["path"] = get_uri(resource) + info["path"] = get_path(resource) info["format_name"] = self.name info["format_version"] = self.version return SilixaPatchAttrs(**info), coords diff --git a/dascore/io/tdms/core.py b/dascore/io/tdms/core.py index e648525c..40eb51e3 100644 --- a/dascore/io/tdms/core.py +++ b/dascore/io/tdms/core.py @@ -7,7 +7,7 @@ from dascore.core import Patch from dascore.io import BinaryReader, FiberIO -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import _get_attrs_coords, _get_data, _get_version_str @@ -22,7 +22,7 @@ class TDMSFormatterV4713(FiberIO): def _get_attr_coords(self, resource): """Get a PatchAttrs for the file.""" out, coords, _ = _get_attrs_coords(resource) - out["path"] = get_uri(resource) + out["path"] = get_path(resource) out["file_format"] = self.name out["file_version"] = self.version return dc.PatchAttrs(**out), coords diff --git a/dascore/io/terra15/core.py b/dascore/io/terra15/core.py index 03d08f1f..76501812 100644 --- a/dascore/io/terra15/core.py +++ b/dascore/io/terra15/core.py @@ -9,7 +9,7 @@ from dascore.io import FiberIO from dascore.utils.hdf5 import H5Reader -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import ( _get_default_attrs, _get_distance_coord, @@ -39,7 +39,7 @@ def _get_attrs_coords_data_node(self, resource): """Get attributes, coords, and datanode for this file.""" version, data_node = _get_version_data_node(resource) attrs = _get_default_attrs(resource) - attrs["path"] = get_uri(resource) + attrs["path"] = get_path(resource) attrs["format_name"] = self.name attrs["format_version"] = version coords_dict = { diff --git a/dascore/io/xml_binary/core.py b/dascore/io/xml_binary/core.py index 1fd08a17..49de9be8 100644 --- a/dascore/io/xml_binary/core.py +++ b/dascore/io/xml_binary/core.py @@ -12,7 +12,7 @@ from dascore.io import FiberIO from dascore.utils.models import UTF8Str -from ...utils.fs import get_uri +from ...utils.fs import get_path from .utils import _load_patches, _paths_to_attrs, _read_xml_metadata @@ -39,7 +39,7 @@ class XMLBinaryV1(FiberIO): def scan(self, resource, timestamp=None, **kwargs) -> list[dc.PatchSummary]: """Scan the contents of the directory.""" - path = get_uri(resource) + path = get_path(resource) metadata = _read_xml_metadata(path / self._metadata_name) data_files = list(path.glob(f"*{self._data_extension}")) extra_attrs = { diff --git a/dascore/utils/display.py b/dascore/utils/display.py index fb45065e..695e8071 100644 --- a/dascore/utils/display.py +++ b/dascore/utils/display.py @@ -132,9 +132,8 @@ def array_to_text(data, units=None) -> Text: def attrs_to_text(attrs) -> Text: """Convert pydantic model to text.""" - attrs = dc.PatchAttrs(**attrs).model_dump(exclude_defaults=True) + attrs = dc.PatchAttrs.model_validate(attrs).model_dump(exclude_defaults=True) # pop coords and dims since they show up in other places. - attrs.pop("coords", None), attrs.pop("dims", None) txt = Text("➤ ") + Text("Attributes", style=dascore_styles["dc_yellow"]) txt += Text("\n") for name, attr in dict(attrs).items(): diff --git a/dascore/utils/fs.py b/dascore/utils/fs.py index d2e5532a..9f7554ef 100644 --- a/dascore/utils/fs.py +++ b/dascore/utils/fs.py @@ -6,245 +6,135 @@ from __future__ import annotations -import os -import re -from collections.abc import Generator, Iterable from collections import deque from pathlib import Path +from typing import Literal -import fsspec -from typing_extensions import Self, Literal - -from dascore.utils.misc import iterate +from upath import UPath # Detect if the string has an associated protocol. -_PROTOCOL_DETECTION_REGEX = r"^([a-zA-Z][a-zA-Z0-9+.-]*):\/\/" - - -def get_fspath(obj): - """ """ - uri = get_uri(obj) - fs = fsspec.open(uri) - return fs -class FSPath: +def get_ls_details( + upath: UPath, on_error: Literal["ignore", "raise"] | callable = "ignore", **kwargs +): + """ + Get the details of path contents. """ - A pathlib-like abstraction for handling multiple filesystems. + listing = None + fs = upath.fs + try: + listing = fs.ls(upath.path, detail=True, **kwargs) + except (FileNotFoundError, OSError) as e: + if on_error == "raise": + raise + if callable(on_error): + on_error(e) + return listing + + +def iter_path_contents( + upath: str | Path | UPath, + ext: str | None = None, + timestamp: float | None = None, + skip_hidden: bool = True, + include_directories: bool = False, + maxdepth: None | int = None, + on_error: Literal["ignore", "raise"] | callable = "omit", + _dir_deque=None, + **kwargs, +): + """ + Iterate over the contents of the file system. - This helps smooth out some of the edges of fsspec. + This implements a breadth-first search of the path's contents. Parameters ---------- - obj - A - + ext + The extension of the files to include. + timestamp + The modified time of the files to include. + skip_hidden + Whether to skip hidden (starts with '.') files and directories. + include_directories + If True, also yield directory paths. + maxdepth + The maximum traversal depth. + on_error + The behavior when contents of a directory like thing aren't + retrievable. + kwargs + Passed to filesystem ls call. """ + # A queue of directories to transverse through. + upath = UPath(upath) + _dir_deque = _dir_deque if _dir_deque is not None else deque() + path_str = str(upath.path) + for info in get_ls_details(upath, on_error=on_error, **kwargs): + # Don't include self in the ls. + if info["name"] == path_str: + continue + pathname = info["name"].rstrip("/") + name = pathname.rsplit("/", 1)[-1] + is_dir = info["type"] == "directory" + mtime = info.get("mtime", 0) + good_ext = ext is None or name.endswith(ext) + good_mtime = timestamp is None or mtime >= timestamp + good_hidden = not skip_hidden or not name.startswith(".") + # Handle files. + if not is_dir and good_ext and good_hidden and good_mtime: + yield upath / info["name"] + elif good_hidden: + dirpath = upath / info["name"] + # If we are to also yield directories + if include_directories: + signal = yield dirpath + # Here we bail out on this directory/contents. + if signal is not None and signal == "skip": + continue + # Add the directory to the queue to be traversed. + _dir_deque.append(dirpath) + # Handle the directories that need to be traversed. + while _dir_deque: + next_upath_directory = _dir_deque.popleft() + if maxdepth is not None and maxdepth <= 1: + continue + new_iter = iter_path_contents( + next_upath_directory, + ext=ext, + timestamp=timestamp, + skip_hidden=skip_hidden, + include_directories=include_directories, + maxdepth=maxdepth - 1 if maxdepth is not None else None, + on_error=on_error, + _dir_deque=_dir_deque, + ) + yield from new_iter + - def __init__(self, obj): - """ """ - if isinstance(obj, FSPath): - self.__dict__.update(obj.__dict__) - return - elif isinstance(obj, fsspec.core.OpenFile): - self._fs = obj.fs - self._path = Path(obj.path) - elif isinstance(obj, fsspec.spec.AbstractFileSystem): - self._fs = obj - self._path = Path("/") - else: - fs, path = fsspec.url_to_fs(obj) - self._fs = fs - self._path = Path(path) - - @classmethod - def from_fs_path(cls, fs, path): - """Create new FSPath from file system and path.""" - out = cls.__new__(cls) - out._fs = fs - out._path = path - return out - - def from_path(self, path): - """Create a new FSPath from the same file system and a new path.""" - out = self.__class__.__new__(self.__class__) - out._fs = self._fs - out._path = path - return out - - @property - def path(self) -> Path: - """Get the pathlib object representing this item.""" - return self._path - - @property - def is_local(self): - return self._fs.protocol == ("file", "local") - - @property - def parent(self) -> Path: - """Get the pathlib object representing this item.""" - return self.from_fs_path(fs=self._fs, path=self._path.parent) - - @property - def full_name(self): - """ - Return the full name. - - Ideally, this is a string that can be used to recreate the - filesystem and path. - """ - name = self._fs.unstrip_protocol(self._path) - return name - - def exists(self): - """Determine if the file exists.""" - return self._fs.exists(self._path) - - def glob(self, arg: str) -> Generator[Self, None, None]: - """ - Glob search the contents of the file system/directory. - """ - glob_str = str(self._path / arg) - for obj in self._fs.glob(glob_str): - yield self.from_path(obj) - - - def get_ls_details( - self, - path_str: str | None=None, - on_error: Literal["ignore", "raise"] | callable="ignore", - **kwargs - ): - """ - Get the details of path contents. - """ - path_str = str(self._path) if path_str is None else path_str - listing = None - try: - listing = self._fs.ls(path_str, detail=True, **kwargs) - except (FileNotFoundError, OSError) as e: - if on_error == "raise": - raise - if callable(on_error): - on_error(e) - return listing - - def iter_contents( - self, - ext: str | None = None, - timestamp: float | None = None, - skip_hidden: bool = True, - include_directories: bool = False, - maxdepth: None | int = None, - on_error: Literal["ignore", "raise"] | callable = "omit", - _dir_deque=None, - **kwargs, - ): - """ - Iterate over the contents of the file system. - - This implements a breadth-first search of the path's contents. - - Parameters - ---------- - ext - The extension of the files to include. - timestamp - The modified time of the files to include. - skip_hidden - Whether to skip hidden (starts with '.') files and directories. - include_directories - If True, also yield directory paths. - maxdepth - The maximum traversal depth. - on_error - The behavior when contents of a directory like thing aren't - retrievable. - kwargs - Passed to filesystem ls call. - """ - # A queue of directories to transverse through. - _dir_deque = _dir_deque if _dir_deque is not None else deque() - path_str = str(self._path) - listing = self.get_ls_details(path_str, on_error, **kwargs) - for info in iterate(listing): - # Don't include self in the ls. - if info['name'] == path_str: - continue - pathname = info["name"].rstrip("/") - name = pathname.rsplit("/", 1)[-1] - is_dir = info['type'] == "directory" - mtime = info['mtime'] - good_ext = ext is None or name.endswith(ext) - good_mtime = timestamp is None or mtime >= timestamp - good_hidden = not skip_hidden or not name.startswith(".") - # Handle files. - if not is_dir and good_ext and good_hidden and good_mtime: - yield self.from_path(info["name"]) - elif good_hidden: - dirpath = self.from_path(info["name"]) - # If we are to also yield directories - if include_directories: - signal = yield dirpath - # Here we bail out on this directory/contents. - if signal is not None and signal == "skip": - continue - # Add the directory to the queue to be traversed. - _dir_deque.append(dirpath) - # Handle the directories that need to be traversed. - while _dir_deque: - next_fspath = _dir_deque.popleft() - if maxdepth is not None and maxdepth <= 1: - continue - new_iter = next_fspath.iter_contents( - ext=ext, - timestamp=timestamp, - skip_hidden=skip_hidden, - include_directories=include_directories, - maxdepth=maxdepth - 1 if maxdepth is not None else None, - on_error=on_error, - _dir_deque=_dir_deque, - ) - yield from new_iter - - - def __truediv__(self, other: str) -> Self: - """Enables division to add to string to Path.""" - return self.from_fs_path(fs=self._fs, path=self._path / other) - - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.full_name})" - - def __hash__(self) -> int: - return hash(self.full_name) - - def __eq__(self, other: Self) -> bool: - return self.full_name == other.full_name - - -def get_uri(obj) -> str: +def get_fspath(obj): + """ """ + uri = get_path(obj) + fs = fsspec.open(uri) + return fs + + +def get_path(obj) -> str: """ - Get the uri string of an object representing a file. + Get the path of an object representing a file. Parameters ---------- obj An object that represents a path to a resource. """ + attrs = ("filename", "name", "path", "full_name") + for attr in attrs: + if (out := getattr(obj, attr, None)) is not None: + return out if isinstance(obj, str): - # Assume the string rep a local file. - if not re.match(_PROTOCOL_DETECTION_REGEX, obj): - obj = f"file://{obj}" - elif hasattr(obj, "filename"): - obj = f"file://{obj.filename}" + return UPath(obj).absolute().path elif isinstance(obj, Path): - obj = f"file://{obj.absolute()}" - elif hasattr(obj, "name"): - obj = f"file://{obj.name}" - elif isinstance(obj, fsspec.core.OpenFiles): - obj = get_fspath(obj) - if hasattr(obj, "full_name"): - obj = obj.full_name + obj = obj.absolute() return obj - diff --git a/dascore/utils/models.py b/dascore/utils/models.py index 2e6cbf14..1b322a0d 100644 --- a/dascore/utils/models.py +++ b/dascore/utils/models.py @@ -156,11 +156,15 @@ class ArraySummary(DascoreBaseModel): A class for summarizing arrays. """ - dtype: str + dtype: Annotated[str, PlainValidator(lambda x: np.dtype(x).str)] shape: tuple[int, ...] ndim: int @classmethod def from_array(cls, array): """Init the summary from an array.""" - return cls(dtype=array.dtype, shape=array.shape, ndim=array.ndim) + return cls( + dtype=array.dtype, + shape=array.shape, + ndim=array.ndim, + ) diff --git a/dascore/utils/patch.py b/dascore/utils/patch.py index ce8068c8..ff1a6800 100644 --- a/dascore/utils/patch.py +++ b/dascore/utils/patch.py @@ -53,6 +53,9 @@ _DimAxisValue = namedtuple("DimAxisValue", ["dim", "axis", "value"]) +_str_adapter = TypeAdapter(str) +_str_tuple_adapter = TypeAdapter(tuple[str, ...]) + def _format_values(val): """String formatting for values for history string.""" @@ -519,10 +522,10 @@ def _get_filename(path_ser, strip_extension): # Validate inputs. Note we cannot use the validation decorator or # it introduces a circular import. - prefix = TypeAdapter(str).validate_python(prefix) - attrs = TypeAdapter(tuple[str, ...]).validate_python(attrs) - coords = TypeAdapter(tuple[str, ...]).validate_python(coords) - sep = TypeAdapter(str).validate_python(sep) + prefix = _str_adapter.validate_python(prefix) + sep = _str_adapter.validate_python(sep) + attrs = _str_tuple_adapter.validate_python(attrs) + coords = _str_tuple_adapter.validate_python(coords) # Ensure we are working with a dataframe. df = dc.scan_to_df( diff --git a/pyproject.toml b/pyproject.toml index 718b14ed..81b33b12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ dependencies = [ "typing_extensions", "pint", "universal_pathlib", + "fsspec>2024.2.0", ] [project.optional-dependencies] diff --git a/tests/test_core/test_patch.py b/tests/test_core/test_patch.py index c03d2b11..018c5ddd 100644 --- a/tests/test_core/test_patch.py +++ b/tests/test_core/test_patch.py @@ -720,3 +720,34 @@ def test_simple_get_name(self, random_patch): """Happy path test.""" name = random_patch.get_patch_name() assert isinstance(name, str) + + +class TestPatchSummary: + """Tests for Patch summaries.""" + + @pytest.fixture(scope="class") + def random_summary(self, random_patch): + """Convert a patch to a summary.""" + patch = random_patch.update_attrs(data_units="strain/s") + return patch.to_summary() + + def test_from_patch(self, random_patch): + """Ensure a patch summary can be created from a patch.""" + out = random_patch.to_summary() + assert isinstance(out, dc.PatchSummary) + + # def test_flattened_tuples(self, random_summary): + # """Ensure dumping flattens tuples to strings.""" + # out = random_summary.model_dump(exclude_unset=True) + # dims = out['coords']['dims'] + # assert isinstance(dims, str) + # assert out['sh'] + # breakpoint() + + def test_to_patch_coord_attrs_info(self, random_summary): + """Test converting to patch, coord, and attr info.""" + patch_info, coords, attrs = random_summary.to_patch_coords_attrs_info(0) + breakpoint() + assert isinstance(patch_info, dict) + assert len(coords) == len(random_summary.coords.coord_map) + assert len(attrs) diff --git a/tests/test_utils/test_fs.py b/tests/test_utils/test_fs.py index 9dba57de..06df1ab0 100644 --- a/tests/test_utils/test_fs.py +++ b/tests/test_utils/test_fs.py @@ -2,19 +2,18 @@ Tests for file system utilities. """ -import inspect import os import time from pathlib import Path import fsspec -import h5py import pytest +from upath import UPath -from dascore.utils.fs import FSPath, get_uri +from dascore.utils.fs import get_ls_details, iter_path_contents from dascore.utils.misc import register_func -fs_paths = [] +UPATHS = [] @pytest.fixture(scope="class") @@ -47,138 +46,29 @@ def complex_folder(tmp_path_factory): @pytest.fixture(scope="class") -@register_func(fs_paths) -def fspath_local(complex_folder): - """Get an fspath object from a local tmpdir.""" - return FSPath(complex_folder) +@register_func(UPATHS) +def upath_local(complex_folder): + """Get a UPath object from a local tmpdir.""" + return UPath(complex_folder) @pytest.fixture(scope="class") -@register_func(fs_paths) -def fspath_github(request): - """Get a fspath object from DASCore's github test data.""" +@register_func(UPATHS) +def upath_github(request): + """Get a UPath object from DASCore's github test data.""" if not request.config.getoption("--network"): pytest.skip("Network tests not selected.") fs = fsspec.filesystem("github", repo="test_data", org="dasdae") - return FSPath(fs) + return UPath(fs) -@pytest.fixture(scope="class", params=fs_paths) +@pytest.fixture(scope="class", params=UPATHS) def fspath(request): """Meta fixture for fspaths of different types.""" name = request.param return request.getfixturevalue(name) -class TestGetUri: - """Tests for getting a path from various objects.""" - - def test_pathlib(self): - """Ensure a pathlib object works with uri generator.""" - my_path = Path(__file__) - path = get_uri(my_path) - assert isinstance(path, str) - assert path == f"file://{my_path!s}" - - def test_str(self): - """Ensure a string simply returns itself.""" - my_path = str(Path(__file__)) - path = get_uri(my_path) - assert isinstance(path, str) - assert path == f"file://{my_path!s}" - - def test_fs_spec(self, tmp_path): - """Ensure a fs spec object returns a path string.""" - fs = fsspec.open(Path(tmp_path)) - out = get_uri(fs) - assert out == f"file://{tmp_path}" - - def test_open_file(self, tmp_path): - """Ensure an open file can be used.""" - path = tmp_path / "file.txt" - with open(path, "wb") as f: - uri = get_uri(f) - assert uri == f"file://{path}" - - def test_h5(self, tmp_path): - """Ensure a h5 file returns a path.""" - path = tmp_path / "file.h5" - with h5py.File(path, "w") as f: - uri = get_uri(f) - assert uri == f"file://{path}" - - def test_idempotent(self, tmp_path): - """Ensure the protocol doesn't keep getting appended.""" - my_path = Path(__file__) - path = get_uri(my_path) - path2 = get_uri(path) - path3 = get_uri(path2) - assert path == path2 == path3 - - -class TestFSPath: - """Tests for the FS Path abstraction.""" - - def test_str_and_repr(self, fspath_local): - """Ensure a valid repr/str exist.""" - out_strs = [str(fspath_local), repr(fspath_local)] - for out in out_strs: - assert isinstance(out, str) - assert str(fspath_local.path) in out - - def test_slash(self, fspath_local): - """Ensure the slash operator works.""" - out = fspath_local / "text_1.txt" - assert out.full_name.endswith("text_1.txt") - - def test_is_local(self, fspath_local): - """Ensure local file path indicates it is local.""" - assert fspath_local.is_local - - def test_is_not_local(self, fspath_github): - """Github is not local.""" - assert not fspath_github.is_local - - def test_local_glob(self, fspath): - """Ensure globing works.""" - out = fspath.glob("*") - assert inspect.isgenerator(out) - for sub in out: - assert isinstance(sub, FSPath) - - -class TestGetLsDetails: - """Tests for the get_ls details function.""" - - def test_get_ls_details_basic(self, fspath_local): - """Happy path for getting file info.""" - ls_details = fspath_local.get_ls_details() - for detail_dict in ls_details: - assert detail_dict - assert isinstance(detail_dict, dict) - - def test_raises(self, fspath_local): - """Ensure errors can be propagated.""" - match = "No such file or directory" - with pytest.raises(OSError, match=match): - fspath_local.get_ls_details( - "_probably_not_a_file.txt", on_error="raise" - ) - - def test_callable(self, fspath_local): - """Ensure a callable can be used to handle details.""" - my_counter = 0 - - def increment(*args, **kwargs): - nonlocal my_counter - my_counter += 1 - - fspath_local.get_ls_details( - "_probably_not_a_file.txt", on_error=increment - ) - assert my_counter == 1 - - class TestIterContents: """Tests for iterating over the contents of directories.""" @@ -207,7 +97,7 @@ def simple_dir(self, tmp_path_factory): """Return a simple directory for iterating.""" path = Path(tmp_path_factory.mktemp("iterfiles")) self.setup_test_directory(self.file_paths, path) - return FSPath(path) + return UPath(path) @pytest.fixture(scope="class") def dir_with_hidden_dir(self, tmp_path_factory): @@ -217,40 +107,38 @@ def dir_with_hidden_dir(self, tmp_path_factory): # add hidden directory with files in it. struct[".Hidden"] = {"Another": {"hidden_by_parent": ".txt"}} self.setup_test_directory(struct, path) - return FSPath(path) + return UPath(path) def test_basic(self, simple_dir): """Test basic usage of iterfiles.""" files = set(self.get_file_paths(self.file_paths, simple_dir)) - out = {x for x in simple_dir.iter_contents()} + out = {x for x in iter_path_contents(simple_dir)} assert files == out def test_one_subdir(self, simple_dir): """Test with one sub directory.""" subdir = simple_dir / "B" / "D" - fspath = FSPath(subdir) - out = set(fspath.iter_contents()) + fspath = UPath(subdir) + out = set(iter_path_contents(fspath)) assert len(out) == 1 def test_multiple_subdirs(self, simple_dir): """Test with multiple sub directories.""" - path1 = FSPath(simple_dir / "B" / "D") - path2 = FSPath(simple_dir / "B" / "G") + path1 = UPath(simple_dir / "B" / "D") + path2 = UPath(simple_dir / "B" / "G") - out = set(path1.iter_contents()) | set(path2.iter_contents()) + out = set(iter_path_contents(path1)) | set(iter_path_contents(path2)) files = self.get_file_paths(self.file_paths, simple_dir) expected = { - x - for x in files - if str(path1._path) in str(x) or str(path2._path) in str(x) + x for x in files if str(path1.path) in str(x) or str(path2.path) in str(x) } assert out == expected def test_extension(self, simple_dir): """Test filtering based on extention.""" - out = set(simple_dir.iter_contents(ext=".txt")) + out = set(iter_path_contents(simple_dir, ext=".txt")) for val in out: - assert val.full_name.endswith(".txt") + assert val.name.endswith(".txt") def test_mtime(self, simple_dir): """Test filtering based on modified time.""" @@ -258,32 +146,32 @@ def test_mtime(self, simple_dir): # set the first file mtime in future now = time.time() first_file = files[0] - os.utime(first_file._path, (now + 10, now + 10)) + os.utime(first_file, (now + 10, now + 10)) # get output make sure it only returned first file - out = list(simple_dir.iter_contents(timestamp=now + 5)) + out = list(iter_path_contents(simple_dir, timestamp=now + 5)) assert len(out) == 1 - assert Path(out[0]._path) == first_file._path + assert out[0] == first_file def test_skips_files_in_hidden_directory(self, dir_with_hidden_dir): """Hidden directory files should be skipped.""" - out1 = list(dir_with_hidden_dir.iter_contents(skip_hidden=True)) + out1 = list(iter_path_contents(dir_with_hidden_dir, skip_hidden=True)) has_hidden_by_parent = ["hidden_by_parent" in str(x) for x in out1] assert not any(has_hidden_by_parent) # But if skip_hidden is False it should be there - out2 = list(dir_with_hidden_dir.iter_contents(skip_hidden=False)) + out2 = list(iter_path_contents(dir_with_hidden_dir, skip_hidden=False)) has_hidden_by_parent = ["hidden_by_parent" in str(x) for x in out2] assert sum(has_hidden_by_parent) == 1 def test_no_directories(self, simple_dir): """Ensure no directories are included when include_directories=False.""" - out = list(simple_dir.iter_contents(include_directories=False)) - has_dirs = [Path(x._path).is_dir() for x in out] + out = list(iter_path_contents(simple_dir, include_directories=False)) + has_dirs = [x.is_dir() for x in out] assert not any(has_dirs) def test_include_directories(self, simple_dir): """Ensure we can get directories back.""" - out = list(simple_dir.iter_contents(include_directories=True)) - returned_dirs = [Path(x._path) for x in out if Path(x._path).is_dir()] + out = list(iter_path_contents(simple_dir, include_directories=True)) + returned_dirs = [x for x in out if x.is_dir()] assert len(returned_dirs) # Directory names dir_names = {x.name for x in returned_dirs} @@ -293,24 +181,100 @@ def test_include_directories(self, simple_dir): def test_skip_signal_directory(self, simple_dir): """Ensure a skip signal can be sent to stop parsing on directory.""" out = [] - iterator = simple_dir.iter_contents(include_directories=True) - for fspath in iterator: - if Path(fspath._path).name == "B": + iterator = iter_path_contents(simple_dir, include_directories=True) + for upath in iterator: + if upath.name == "B": iterator.send("skip") - out.append(fspath._path) - names = {Path(x).name.split(".")[0] for x in out} + out.append(upath) + names = {x.stem for x in out} # Anything after B should have been skipped assert {"C", "D", "E", "F"}.isdisjoint(names) def test_max_depth_no_dirs(self, simple_dir): """Ensure maxdepth=1 only returns top level contents.""" - out = {Path(x._path) for x in simple_dir.iter_contents(maxdepth=1)} - expected = {x for x in simple_dir._path.glob("*") if not x.is_dir()} + out = {x for x in iter_path_contents(simple_dir, maxdepth=1)} + expected = {x for x in simple_dir.glob("*") if not x.is_dir()} assert out == expected def test_max_depth_dirs(self, simple_dir): """Ensure directories can also be returned.""" - myiter = simple_dir.iter_contents(maxdepth=1, include_directories=True) - out = {Path(x._path) for x in myiter} - expected = {x for x in simple_dir._path.glob("*")} + myiter = iter_path_contents(simple_dir, maxdepth=1, include_directories=True) + out = {x for x in myiter} + expected = {x for x in simple_dir.glob("*")} assert out == expected + + +class TestGetLsDetails: + """Tests for the get_ls details function.""" + + def test_get_ls_details_basic(self, upath_local): + """Happy path for getting file info.""" + ls_details = get_ls_details(upath_local) + for detail_dict in ls_details: + assert detail_dict + assert isinstance(detail_dict, dict) + + def test_raises(self): + """Ensure errors can be propagated.""" + match = "No such file or directory" + bad_path = UPath("_probably_not_a_file.txt") + with pytest.raises(OSError, match=match): + get_ls_details(bad_path, on_error="raise") + + def test_callable(self): + """Ensure a callable can be used to handle details.""" + my_counter = 0 + + def increment(*args, **kwargs): + nonlocal my_counter + my_counter += 1 + + bad_file = UPath("_probably_not_a_file.txt") + get_ls_details(bad_file, on_error=increment) + assert my_counter == 1 + +# +# class TestGetPath: +# """Tests for getting a path from various objects.""" +# +# def test_pathlib(self): +# """Ensure a pathlib object works with uri generator.""" +# my_path = Path(__file__) +# path = get_uri(my_path) +# assert isinstance(path, str) +# assert path == f"file://{my_path!s}" +# +# def test_str(self): +# """Ensure a string simply returns itself.""" +# my_path = str(Path(__file__)) +# path = get_uri(my_path) +# assert isinstance(path, str) +# assert path == f"file://{my_path!s}" +# +# def test_fs_spec(self, tmp_path): +# """Ensure a fs spec object returns a path string.""" +# fs = fsspec.open(Path(tmp_path)) +# out = get_uri(fs) +# assert out == f"file://{tmp_path}" +# +# def test_open_file(self, tmp_path): +# """Ensure an open file can be used.""" +# path = tmp_path / "file.txt" +# with open(path, "wb") as f: +# uri = get_uri(f) +# assert uri == f"file://{path}" +# +# def test_h5(self, tmp_path): +# """Ensure a h5 file returns a path.""" +# path = tmp_path / "file.h5" +# with h5py.File(path, "w") as f: +# uri = get_uri(f) +# assert uri == f"file://{path}" +# +# def test_idempotent(self, tmp_path): +# """Ensure the protocol doesn't keep getting appended.""" +# my_path = Path(__file__) +# path = get_uri(my_path) +# path2 = get_uri(path) +# path3 = get_uri(path2) +# assert path == path2 == path3