Skip to content

Commit

Permalink
Add dask-ms partitioning attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Mar 27, 2024
1 parent e094bc7 commit b7a1006
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions daskms/experimental/katdal/msv2_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numba
import xarray

from daskms.constants import DASKMS_PARTITION_KEY
from daskms.experimental.katdal.corr_products import corrprod_index
from daskms.experimental.katdal.transpose import transpose
from daskms.experimental.katdal.uvw import uvw_coords
Expand All @@ -20,6 +21,18 @@
}


# Partitioning columns
GROUP_COLS = ["FIELD_ID", "DATA_DESC_ID", "SCAN_NUMBER"]

# No partitioning, applies to many subtables
EMPTY_PARTITION_SCHEMA = {DASKMS_PARTITION_KEY: ()}

# katdal datasets only have one spectral window
# and one polarisation. Thus, there
# is only one DATA_DESC_ID and it is zero
DATA_DESC_ID = 0


def to_mjds(timestamp: Timestamp):
"""Converts a katpoint Timestamp to Modified Julian Date Seconds"""
return timestamp.to_mjd() * 24 * 60 * 60
Expand Down Expand Up @@ -110,10 +123,7 @@ def _main_xarray_factory(self, field_id, state_id, scan_index, scan_state, targe
"ANTENNA2": (primary_dims, ant2),
"FEED1": (primary_dims, da.zeros_like(ant1)),
"FEED2": (primary_dims, da.zeros_like(ant1)),
# katdal datasets only have one spectral window
# and one polarisation. Thus, there
# is only one DATA_DESC_ID and it is zero
"DATA_DESC_ID": (primary_dims, da.zeros_like(ant1)),
"DATA_DESC_ID": (primary_dims, da.full_like(ant1, DATA_DESC_ID)),
"FIELD_ID": (primary_dims, da.full_like(ant1, field_id)),
"STATE_ID": (primary_dims, da.full_like(ant1, state_id)),
"ARRAY_ID": (primary_dims, da.zeros_like(ant1)),
Expand Down Expand Up @@ -142,7 +152,18 @@ def _main_xarray_factory(self, field_id, state_id, scan_index, scan_state, targe
),
}

return xarray.Dataset(data_vars)
attrs = {
DASKMS_PARTITION_KEY: tuple(
(c, data_vars[c][-1].dtype.name) for c in GROUP_COLS
),
"FIELD_ID": field_id,
"DATA_DESC_ID": DATA_DESC_ID,
"SCAN_NUMBER": scan_index,
}

assert (set(GROUP_COLS) & set(attrs)) == set(GROUP_COLS)

return xarray.Dataset(data_vars, attrs=attrs)

def _antenna_xarray_factory(self):
antennas = self._dataset.ants
Expand Down Expand Up @@ -432,4 +453,12 @@ def xarray_datasets(self):
"STATE": self._state_xarray_factory(state_modes),
}

# Assign empty partition schemas to subtables
subtables = {
n: dss.assign_attrs(EMPTY_PARTITION_SCHEMA)
if isinstance(dss, xarray.Dataset)
else [ds.assign_attrs(EMPTY_PARTITION_SCHEMA) for ds in dss]
for n, dss in subtables.items()
}

return main_xds, subtables

0 comments on commit b7a1006

Please sign in to comment.