diff --git a/README.rst b/README.rst index 0482d25b..7852c29e 100644 --- a/README.rst +++ b/README.rst @@ -649,6 +649,114 @@ Attributes: institution: US National Weather Service - NCEP] +Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load +all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```. + +When coordinates values mismatches, name will be incremented, eg. ``pressureFromGroundLayer1``. + +Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``, +cf. example below. + +.. code-block: python + +>>> import xarray as xr +>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib', + backend_kwargs={'filter_heterogeneous': True}) + +Dimensions: (y: 65, x: 93, isobaricInhPa: 19, + pressureFromGroundLayer: 5, + isobaricInhPa1: 5, + pressureFromGroundLayer1: 2, + pressureFromGroundLayer2: 2, + heightAboveGroundLayer: 2) +Coordinates: + time datetime64[ns] ... + step timedelta64[ns] ... + meanSea float64 ... + latitude (y, x) float64 ... + longitude (y, x) float64 ... + valid_time datetime64[ns] ... + surface float64 ... + * isobaricInhPa (isobaricInhPa) float64 1e+03 950.0 ... 100.0 + cloudBase float64 ... + cloudTop float64 ... + maxWind float64 ... + isothermZero float64 ... + tropopause float64 ... + * pressureFromGroundLayer (pressureFromGroundLayer) float64 3e+03 ... 1.5... + * isobaricInhPa1 (isobaricInhPa1) float64 1e+03 850.0 ... 250.0 + heightAboveGround float64 ... + heightAboveGround1 float64 ... + heightAboveGround2 float64 ... + * pressureFromGroundLayer1 (pressureFromGroundLayer1) float64 9e+03 1.8e+04 + * pressureFromGroundLayer2 (pressureFromGroundLayer2) float64 9e+03 1.8e+04 + atmosphereSingleLayer float64 ... + * heightAboveGroundLayer (heightAboveGroundLayer) float64 1e+03 3e+03 + pressureFromGroundLayer3 float64 ... + pressureFromGroundLayer4 float64 ... +Dimensions without coordinates: y, x +Data variables: + prmsl__meanSea__instant (y, x) float32 ... + gust__surface__instant (y, x) float32 ... + gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + gh__cloudBase__instant (y, x) float32 ... + gh__cloudTop__instant (y, x) float32 ... + gh__maxWind__instant (y, x) float32 ... + gh__isothermZero__instant (y, x) float32 ... + t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + t__cloudTop__instant (y, x) float32 ... + t__tropopause__instant (y, x) float32 ... + t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + r__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + r__isothermZero__instant (y, x) float32 ... + r__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + w__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__tropopause__instant (y, x) float32 ... + u__maxWind__instant (y, x) float32 ... + u__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + v__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + v__tropopause__instant (y, x) float32 ... + v__maxWind__instant (y, x) float32 ... + v__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + absv__isobaricInhPa__instant (isobaricInhPa1, y, x) float32 ... + mslet__meanSea__instant (y, x) float32 ... + sp__surface__instant (y, x) float32 ... + orog__surface__instant (y, x) float32 ... + t2m__heightAboveGround__instant (y, x) float32 ... + r2__heightAboveGround__instant (y, x) float32 ... + u10__heightAboveGround__instant (y, x) float32 ... + v10__heightAboveGround__instant (y, x) float32 ... + tp__surface__accum (y, x) float32 ... + acpcp__surface__accum (y, x) float32 ... + csnow__surface__instant (y, x) float32 ... + cicep__surface__instant (y, x) float32 ... + cfrzr__surface__instant (y, x) float32 ... + crain__surface__instant (y, x) float32 ... + cape__surface__instant (y, x) float32 ... + cape__pressureFromGroundLayer__instant (pressureFromGroundLayer1, y, x) float32 ... + cin__surface__instant (y, x) float32 ... + cin__pressureFromGroundLayer__instant (pressureFromGroundLayer2, y, x) float32 ... + pwat__atmosphereSingleLayer__instant (y, x) float32 ... + pres__cloudBase__instant (y, x) float32 ... + pres__cloudTop__instant (y, x) float32 ... + pres__maxWind__instant (y, x) float32 ... + hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ... + trpp__tropopause__instant (y, x) float32 ... + pli__pressureFromGroundLayer__instant (y, x) float32 ... + 4lftx__pressureFromGroundLayer__instant (y, x) float32 ... + unknown__surface__instant (y, x) float32 ... +Attributes: + GRIB_edition: 2 + GRIB_centre: kwbc + GRIB_centreDescription: US National Weather Service - NCEP + GRIB_subCentre: 0 + Conventions: CF-1.7 + institution: US National Weather Service - NCEP + history: 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1... + + + Advanced usage ============== diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index bf5eea9a..6c5edd08 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -22,6 +22,7 @@ import json import logging import os +import itertools import typing as T import attr @@ -299,6 +300,17 @@ def __eq__(self, other): equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes) return equal and np.array_equal(self.data, other.data) + def rename_coordinate(self, existing_coord_name, new_coord_name): + # type: (str, str) -> None + # Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x') + self.dimensions = tuple( + [x.replace(existing_coord_name, new_coord_name) for x in self.dimensions] + ) + if 'coordinates' in self.attributes: + # Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time' + new_coordinates = self.attributes['coordinates'].replace(existing_coord_name, new_coord_name) + self.attributes['coordinates'] = new_coordinates + def expand_item(item, shape): # type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...] @@ -661,13 +673,31 @@ def build_dataset_components( time_dims: T.Sequence[str] = ("time", "step"), extra_coords: T.Dict[str, str] = {}, cache_geo_coords: bool = True, + filter_heterogeneous: bool = False, ) -> T.Tuple[T.Dict[str, int], T.Dict[str, Variable], T.Dict[str, T.Any], T.Dict[str, T.Any]]: dimensions = {} # type: T.Dict[str, int] variables = {} # type: T.Dict[str, Variable] filter_by_keys = index.filter_by_keys - for param_id in index.get("paramId", []): - var_index = index.subindex(paramId=param_id) + subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])] + if filter_heterogeneous: + # Generate all possible combinations of paramId/typeOfLevel/stepType + subindex_kwargs_list = [] + combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', []) + for param_id, type_of_level, step_type in itertools.product(*combinations): + subindex_kwargs_list.append({ + 'paramId': param_id, + 'typeOfLevel': type_of_level, + 'stepType': step_type + }) + + for subindex_kwargs in subindex_kwargs_list: + var_index = index.subindex(**subindex_kwargs) + if not var_index.header_values: + # For some combinations, no match availables + log.debug(f"No match for {subindex_kwargs}") + continue + try: dims, data_var, coord_vars = build_variable_components( var_index, @@ -692,10 +722,40 @@ def build_dataset_components( fbks.append(fbk) error_message += "\n filter_by_keys=%r" % fbk raise DatasetBuildError(error_message, key, fbks) + param_id = subindex_kwargs['paramId'] short_name = data_var.attributes.get("GRIB_shortName", "paramId_%d" % param_id) var_name = data_var.attributes.get("GRIB_cfVarName", "unknown") if "parameter" in encode_cf and var_name not in ("undef", "unknown"): short_name = var_name + + if filter_heterogeneous: + # Unique variable name looking like "t__surface__instant" + short_name = "__".join([ + short_name, + data_var.attributes.get("GRIB_typeOfLevel", "unknown"), + data_var.attributes.get("GRIB_stepType", "unknown"), + ]) + + # Handle coordinates name/values collision + for coord_name in list(coord_vars.keys()): + if coord_name in variables: + current_coord_var = coord_vars[coord_name] + existing_coord_var = variables[coord_name] + if current_coord_var == existing_coord_var: + continue + + # Coordinates have same name, but values not equal -> we need to rename to avoid collision + # Renaming will follow something like isobaricInhPa, isobaricInhPa1, etc. + coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)]) + coord_name_unique = f"{coord_name}{coord_name_count}" + + # Update attributes + if coord_name in dims: + dims[coord_name_unique] = dims.pop(coord_name) + data_var.rename_coordinate(coord_name, coord_name_unique) + current_coord_var.rename_coordinate(coord_name, coord_name_unique) + coord_vars[coord_name_unique] = coord_vars.pop(coord_name) + try: dict_merge(variables, coord_vars) dict_merge(variables, {short_name: data_var}) diff --git a/cfgrib/xarray_plugin.py b/cfgrib/xarray_plugin.py index a9268208..a80b8dcd 100644 --- a/cfgrib/xarray_plugin.py +++ b/cfgrib/xarray_plugin.py @@ -98,6 +98,7 @@ def open_dataset( lock: T.Union[T.ContextManager[T.Any], None] = None, indexpath: str = messages.DEFAULT_INDEXPATH, filter_by_keys: T.Dict[str, T.Any] = {}, + filter_heterogeneous: bool = False, read_keys: T.Iterable[str] = (), encode_cf: T.Sequence[str] = ("parameter", "time", "geography", "vertical"), squeeze: bool = True, @@ -110,6 +111,7 @@ def open_dataset( filename_or_obj, indexpath=indexpath, filter_by_keys=filter_by_keys, + filter_heterogeneous=filter_heterogeneous, read_keys=read_keys, encode_cf=encode_cf, squeeze=squeeze, diff --git a/tests/test_40_xarray_store.py b/tests/test_40_xarray_store.py index 0f51613b..a4bc6544 100644 --- a/tests/test_40_xarray_store.py +++ b/tests/test_40_xarray_store.py @@ -126,6 +126,16 @@ def test_open_datasets() -> None: assert res[0].attrs["GRIB_centre"] == "ecmf" +def test_open_filter_heterogeneous() -> None: + res = xarray_store.open_dataset(TEST_DATA, backend_kwargs={ + 'filter_heterogeneous': True + }) + + assert isinstance(res, xr.Dataset) + assert "t__isobaricInhPa__instant" in res + assert res.attrs["GRIB_centre"] == "ecmf" + + def test_cached_geo_coords() -> None: ds1 = xarray_store.open_dataset(TEST_DATA_MULTIPLE_FIELDS) ds2 = xarray_store.open_dataset( diff --git a/tests/test_50_sample_data.py b/tests/test_50_sample_data.py index c4384dd7..3a16ac77 100644 --- a/tests/test_50_sample_data.py +++ b/tests/test_50_sample_data.py @@ -64,6 +64,37 @@ def test_open_datasets(grib_name: str) -> None: assert len(res) > 1 +@pytest.mark.parametrize( + "grib_name", + [ + "era5-levels-members", + "fields_with_missing_values", + "lambert_grid", + "reduced_gg", + "regular_gg_sfc", + "regular_gg_pl", + "regular_gg_ml", + "regular_gg_ml_g2", + "regular_ll_sfc", + "regular_ll_msl", + "scanning_mode_64", + "single_gridpoint", + "spherical_harmonics", + "t_analysis_and_fc_0", + "hpa_and_pa", + "uv_on_different_levels", + ] +) +def test_open_dataset_heterogeneous(grib_name: str) -> None: + grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib") + + res = xarray_store.open_dataset(grib_path, backend_kwargs={ + 'filter_heterogeneous': True + }) + + assert isinstance(res, xr.Dataset) + + @pytest.mark.parametrize( "grib_name", [