Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/open heterogeneous #362

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
108 changes: 108 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,114 @@ Attributes:
institution: US National Weather Service - NCEP]


Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load
all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```.

When coordinates values mismatches, name will be incremented, eg. ``pressureFromGroundLayer1``.

Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``,
cf. example below.

.. code-block: python

>>> import xarray as xr
>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib',
backend_kwargs={'filter_heterogeneous': True})
<xarray.Dataset>
Dimensions: (y: 65, x: 93, isobaricInhPa: 19,
pressureFromGroundLayer: 5,
isobaricInhPa1: 5,
pressureFromGroundLayer1: 2,
pressureFromGroundLayer2: 2,
heightAboveGroundLayer: 2)
Coordinates:
time datetime64[ns] ...
step timedelta64[ns] ...
meanSea float64 ...
latitude (y, x) float64 ...
longitude (y, x) float64 ...
valid_time datetime64[ns] ...
surface float64 ...
* isobaricInhPa (isobaricInhPa) float64 1e+03 950.0 ... 100.0
cloudBase float64 ...
cloudTop float64 ...
maxWind float64 ...
isothermZero float64 ...
tropopause float64 ...
* pressureFromGroundLayer (pressureFromGroundLayer) float64 3e+03 ... 1.5...
* isobaricInhPa1 (isobaricInhPa1) float64 1e+03 850.0 ... 250.0
heightAboveGround float64 ...
heightAboveGround1 float64 ...
heightAboveGround2 float64 ...
* pressureFromGroundLayer1 (pressureFromGroundLayer1) float64 9e+03 1.8e+04
* pressureFromGroundLayer2 (pressureFromGroundLayer2) float64 9e+03 1.8e+04
atmosphereSingleLayer float64 ...
* heightAboveGroundLayer (heightAboveGroundLayer) float64 1e+03 3e+03
pressureFromGroundLayer3 float64 ...
pressureFromGroundLayer4 float64 ...
Dimensions without coordinates: y, x
Data variables:
prmsl__meanSea__instant (y, x) float32 ...
gust__surface__instant (y, x) float32 ...
gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
gh__cloudBase__instant (y, x) float32 ...
gh__cloudTop__instant (y, x) float32 ...
gh__maxWind__instant (y, x) float32 ...
gh__isothermZero__instant (y, x) float32 ...
t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
t__cloudTop__instant (y, x) float32 ...
t__tropopause__instant (y, x) float32 ...
t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
r__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
r__isothermZero__instant (y, x) float32 ...
r__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
w__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
u__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
u__tropopause__instant (y, x) float32 ...
u__maxWind__instant (y, x) float32 ...
u__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
v__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
v__tropopause__instant (y, x) float32 ...
v__maxWind__instant (y, x) float32 ...
v__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
absv__isobaricInhPa__instant (isobaricInhPa1, y, x) float32 ...
mslet__meanSea__instant (y, x) float32 ...
sp__surface__instant (y, x) float32 ...
orog__surface__instant (y, x) float32 ...
t2m__heightAboveGround__instant (y, x) float32 ...
r2__heightAboveGround__instant (y, x) float32 ...
u10__heightAboveGround__instant (y, x) float32 ...
v10__heightAboveGround__instant (y, x) float32 ...
tp__surface__accum (y, x) float32 ...
acpcp__surface__accum (y, x) float32 ...
csnow__surface__instant (y, x) float32 ...
cicep__surface__instant (y, x) float32 ...
cfrzr__surface__instant (y, x) float32 ...
crain__surface__instant (y, x) float32 ...
cape__surface__instant (y, x) float32 ...
cape__pressureFromGroundLayer__instant (pressureFromGroundLayer1, y, x) float32 ...
cin__surface__instant (y, x) float32 ...
cin__pressureFromGroundLayer__instant (pressureFromGroundLayer2, y, x) float32 ...
pwat__atmosphereSingleLayer__instant (y, x) float32 ...
pres__cloudBase__instant (y, x) float32 ...
pres__cloudTop__instant (y, x) float32 ...
pres__maxWind__instant (y, x) float32 ...
hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ...
trpp__tropopause__instant (y, x) float32 ...
pli__pressureFromGroundLayer__instant (y, x) float32 ...
4lftx__pressureFromGroundLayer__instant (y, x) float32 ...
unknown__surface__instant (y, x) float32 ...
Attributes:
GRIB_edition: 2
GRIB_centre: kwbc
GRIB_centreDescription: US National Weather Service - NCEP
GRIB_subCentre: 0
Conventions: CF-1.7
institution: US National Weather Service - NCEP
history: 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1...



Advanced usage
==============

Expand Down
64 changes: 62 additions & 2 deletions cfgrib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import os
import itertools
import typing as T

import attr
Expand Down Expand Up @@ -299,6 +300,17 @@ def __eq__(self, other):
equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes)
return equal and np.array_equal(self.data, other.data)

def rename_coordinate(self, existing_coord_name, new_coord_name):
# type: (str, str) -> None
# Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x')
self.dimensions = tuple(
[x.replace(existing_coord_name, new_coord_name) for x in self.dimensions]
)
if 'coordinates' in self.attributes:
# Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time'
new_coordinates = self.attributes['coordinates'].replace(existing_coord_name, new_coord_name)
self.attributes['coordinates'] = new_coordinates


def expand_item(item, shape):
# type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...]
Expand Down Expand Up @@ -661,13 +673,31 @@ def build_dataset_components(
time_dims: T.Sequence[str] = ("time", "step"),
extra_coords: T.Dict[str, str] = {},
cache_geo_coords: bool = True,
filter_heterogeneous: bool = False,
) -> T.Tuple[T.Dict[str, int], T.Dict[str, Variable], T.Dict[str, T.Any], T.Dict[str, T.Any]]:
dimensions = {} # type: T.Dict[str, int]
variables = {} # type: T.Dict[str, Variable]
filter_by_keys = index.filter_by_keys

for param_id in index.get("paramId", []):
var_index = index.subindex(paramId=param_id)
subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])]
if filter_heterogeneous:
# Generate all possible combinations of paramId/typeOfLevel/stepType
subindex_kwargs_list = []
combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', [])
for param_id, type_of_level, step_type in itertools.product(*combinations):
subindex_kwargs_list.append({
'paramId': param_id,
'typeOfLevel': type_of_level,
'stepType': step_type
})

for subindex_kwargs in subindex_kwargs_list:
var_index = index.subindex(**subindex_kwargs)
if not var_index.header_values:
# For some combinations, no match availables
log.debug(f"No match for {subindex_kwargs}")
continue

try:
dims, data_var, coord_vars = build_variable_components(
var_index,
Expand All @@ -692,10 +722,40 @@ def build_dataset_components(
fbks.append(fbk)
error_message += "\n filter_by_keys=%r" % fbk
raise DatasetBuildError(error_message, key, fbks)
param_id = subindex_kwargs['paramId']
short_name = data_var.attributes.get("GRIB_shortName", "paramId_%d" % param_id)
var_name = data_var.attributes.get("GRIB_cfVarName", "unknown")
if "parameter" in encode_cf and var_name not in ("undef", "unknown"):
short_name = var_name

if filter_heterogeneous:
# Unique variable name looking like "t__surface__instant"
short_name = "__".join([
short_name,
data_var.attributes.get("GRIB_typeOfLevel", "unknown"),
data_var.attributes.get("GRIB_stepType", "unknown"),
])

# Handle coordinates name/values collision
for coord_name in list(coord_vars.keys()):
if coord_name in variables:
current_coord_var = coord_vars[coord_name]
existing_coord_var = variables[coord_name]
if current_coord_var == existing_coord_var:
continue

# Coordinates have same name, but values not equal -> we need to rename to avoid collision
# Renaming will follow something like isobaricInhPa, isobaricInhPa1, etc.
coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)])
coord_name_unique = f"{coord_name}{coord_name_count}"

# Update attributes
if coord_name in dims:
dims[coord_name_unique] = dims.pop(coord_name)
data_var.rename_coordinate(coord_name, coord_name_unique)
current_coord_var.rename_coordinate(coord_name, coord_name_unique)
coord_vars[coord_name_unique] = coord_vars.pop(coord_name)

try:
dict_merge(variables, coord_vars)
dict_merge(variables, {short_name: data_var})
Expand Down
2 changes: 2 additions & 0 deletions cfgrib/xarray_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def open_dataset(
lock: T.Union[T.ContextManager[T.Any], None] = None,
indexpath: str = messages.DEFAULT_INDEXPATH,
filter_by_keys: T.Dict[str, T.Any] = {},
filter_heterogeneous: bool = False,
read_keys: T.Iterable[str] = (),
encode_cf: T.Sequence[str] = ("parameter", "time", "geography", "vertical"),
squeeze: bool = True,
Expand All @@ -110,6 +111,7 @@ def open_dataset(
filename_or_obj,
indexpath=indexpath,
filter_by_keys=filter_by_keys,
filter_heterogeneous=filter_heterogeneous,
read_keys=read_keys,
encode_cf=encode_cf,
squeeze=squeeze,
Expand Down
10 changes: 10 additions & 0 deletions tests/test_40_xarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def test_open_datasets() -> None:
assert res[0].attrs["GRIB_centre"] == "ecmf"


def test_open_filter_heterogeneous() -> None:
res = xarray_store.open_dataset(TEST_DATA, backend_kwargs={
'filter_heterogeneous': True
})

assert isinstance(res, xr.Dataset)
assert "t__isobaricInhPa__instant" in res
assert res.attrs["GRIB_centre"] == "ecmf"


def test_cached_geo_coords() -> None:
ds1 = xarray_store.open_dataset(TEST_DATA_MULTIPLE_FIELDS)
ds2 = xarray_store.open_dataset(
Expand Down
31 changes: 31 additions & 0 deletions tests/test_50_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,37 @@ def test_open_datasets(grib_name: str) -> None:
assert len(res) > 1


@pytest.mark.parametrize(
"grib_name",
[
"era5-levels-members",
"fields_with_missing_values",
"lambert_grid",
"reduced_gg",
"regular_gg_sfc",
"regular_gg_pl",
"regular_gg_ml",
"regular_gg_ml_g2",
"regular_ll_sfc",
"regular_ll_msl",
"scanning_mode_64",
"single_gridpoint",
"spherical_harmonics",
"t_analysis_and_fc_0",
"hpa_and_pa",
"uv_on_different_levels",
]
)
def test_open_dataset_heterogeneous(grib_name: str) -> None:
grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib")

res = xarray_store.open_dataset(grib_path, backend_kwargs={
'filter_heterogeneous': True
})

assert isinstance(res, xr.Dataset)


@pytest.mark.parametrize(
"grib_name",
[
Expand Down
Loading