Skip to content

Commit

Permalink
Export data from katdal to zarr (#315)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins authored Mar 28, 2024
1 parent 5eec1d9 commit 32e866b
Show file tree
Hide file tree
Showing 16 changed files with 1,170 additions and 4 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ History

X.Y.Z (YYYY-MM-DD)
------------------
* Add a ``dask-ms katdal import`` application for exporting SARAO archive data directly to zarr (:pr:`315`)
* Define dask-ms command line applications with click (:pr:`317`)
* Make poetry dev and docs groups optional (:pr:`316`)
* Only test Github Action Push events on master (:pr:`313`)
Expand Down
4 changes: 3 additions & 1 deletion daskms/apps/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import click

from daskms.apps.convert import convert
from daskms.apps.katdal_import import katdal


@click.group()
@click.group(name="dask-ms")
@click.pass_context
@click.option("--debug/--no-debug", default=False)
def main(ctx, debug):
Expand All @@ -15,3 +16,4 @@ def main(ctx, debug):


main.add_command(convert)
main.add_command(katdal)
67 changes: 67 additions & 0 deletions daskms/apps/katdal_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import click


@click.group()
@click.pass_context
def katdal(ctx):
"""subgroup for katdal commands"""
pass


class PolarisationListType(click.ParamType):
name = "polarisation list"
VALID = {"HH", "HV", "VH", "VV"}

def convert(self, value, param, ctx):
if isinstance(value, str):
value = [p.strip() for p in value.split(",")]
else:
raise TypeError(
f"{value} should be a comma separated string of polarisations"
)

if not set(value).issubset(self.VALID):
raise ValueError(f"{set(value)} is not a subset of {self.VALID}")

return value


@katdal.command(name="import")
@click.pass_context
@click.argument("rdb_url", required=True)
@click.option(
"-a",
"--no-auto",
flag_value=True,
default=False,
help="Exclude auto-correlation data",
)
@click.option(
"-o",
"--output-store",
help="Output store name. Will be derived from the rdb url if not provided.",
default=None,
)
@click.option(
"-p",
"--pols-to-use",
default="HH,HV,VH,VV",
help="Select polarisation products to include in MS as "
"a comma-separated list, containing values from [HH, HV, VH, VV].",
type=PolarisationListType(),
)
@click.option(
"--applycal",
default="",
help="List of calibration solutions to apply to data as "
"a string of comma-separated names, e.g. 'l1' or "
"'K,B,G'. Use 'default' for L1 + L2 and 'all' for "
"all available products.",
)
def _import(ctx, rdb_url, no_auto, pols_to_use, applycal, output_store):
"""Export an observation in the SARAO archive to zarr formation
RDB_URL is the SARAO archive link"""
from daskms.experimental.katdal import katdal_import

katdal_import(rdb_url, output_store, no_auto, applycal)
1 change: 1 addition & 0 deletions daskms/experimental/katdal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from daskms.experimental.katdal.katdal_import import katdal_import
68 changes: 68 additions & 0 deletions daskms/experimental/katdal/corr_products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Creation of the correlation product index is derived from
# https://github.com/ska-sa/katdal/blob/v0.22/scripts/mvftoms.py
# under the following license
#
# ################################################################################
# Copyright (c) 2011-2023, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


from collections import namedtuple

import numpy as np

CPInfo = namedtuple("CPInfo", "ant1_index ant2_index ant1 ant2 cp_index")


def corrprod_index(dataset, pols_to_use, include_auto_corrs=False):
"""The correlator product index (with -1 representing missing indices)."""
corrprod_to_index = {tuple(cp): n for n, cp in enumerate(dataset.corr_products)}

# ==========================================
# Generate per-baseline antenna pairs and
# correlator product indices
# ==========================================

def _cp_index(a1, a2, pol):
"""Create correlator product index from antenna pair and pol."""
a1 = a1.name + pol[0].lower()
a2 = a2.name + pol[1].lower()
return corrprod_to_index.get((a1, a2), -1)

# Generate baseline antenna pairs
auto_corrs = 0 if include_auto_corrs else 1
ant1_index, ant2_index = np.triu_indices(len(dataset.ants), auto_corrs)
ant1_index, ant2_index = (a.astype(np.int32) for a in (ant1_index, ant2_index))

# Order as similarly to the input as possible, which gives better performance
# in permute_baselines.
bl_indices = list(zip(ant1_index, ant2_index))
bl_indices.sort(
key=lambda ants: _cp_index(
dataset.ants[ants[0]], dataset.ants[ants[1]], pols_to_use[0]
)
)
# Undo the zip
ant1_index[:] = [bl[0] for bl in bl_indices]
ant2_index[:] = [bl[1] for bl in bl_indices]
ant1 = [dataset.ants[a1] for a1 in ant1_index]
ant2 = [dataset.ants[a2] for a2 in ant2_index]

# Create actual correlator product index
cp_index = [_cp_index(a1, a2, p) for a1, a2 in zip(ant1, ant2) for p in pols_to_use]
cp_index = np.array(cp_index, dtype=np.int32)
cp_index = cp_index.reshape(-1, len(pols_to_use))

return CPInfo(ant1_index, ant2_index, ant1, ant2, cp_index)
53 changes: 53 additions & 0 deletions daskms/experimental/katdal/katdal_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import urllib

import dask

from daskms.utils import requires

try:
import katdal
from katdal.dataset import DataSet

from daskms.experimental.katdal.msv2_facade import XarrayMSV2Facade
from daskms.experimental.zarr import xds_to_zarr
except ImportError as e:
import_error = e
else:
import_error = None


def default_output_name(url):
url_parts = urllib.parse.urlparse(url, scheme="file")
# Create zarr dataset in current working directory (strip off directories)
dataset_filename = os.path.basename(url_parts.path)
# Get rid of the ".full" bit on RDB files (it's the same dataset)
full_rdb_ext = ".full.rdb"
if dataset_filename.endswith(full_rdb_ext):
dataset_basename = dataset_filename[: -len(full_rdb_ext)]
else:
dataset_basename = os.path.splitext(dataset_filename)[0]
return f"{dataset_basename}.zarr"


@requires("pip install dask-ms[katdal]", import_error)
def katdal_import(url: str, out_store: str, no_auto: bool, applycal: str):
if isinstance(url, str):
dataset = katdal.open(url, appycal=applycal)
elif isinstance(url, DataSet):
dataset = url
else:
raise TypeError(f"{url} must be a string or a katdal DataSet")

facade = XarrayMSV2Facade(dataset, no_auto=no_auto)
main_xds, subtable_xds = facade.xarray_datasets()

if not out_store:
out_store = default_output_name(url)

writes = [
xds_to_zarr(main_xds, out_store),
*(xds_to_zarr(ds, f"{out_store}::{k}") for k, ds in subtable_xds.items()),
]

dask.compute(writes)
70 changes: 70 additions & 0 deletions daskms/experimental/katdal/meerkat_antennas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
MEERKAT_ANTENNA_DESCRIPTIONS = [
"m000, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -8.264 -207.290 8.597",
"m001, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 1.121 -171.762 8.471",
"m002, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -32.113 -224.236 8.645",
"m003, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -66.518 -202.276 8.285",
"m004, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -123.624 -252.946 8.513",
"m005, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -102.088 -283.120 8.875",
"m006, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -18.232 -295.428 9.188",
"m007, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -89.592 -402.732 9.769",
"m008, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -93.527 -535.026 10.445",
"m009, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 32.357 -371.056 10.140",
"m010, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 88.095 -511.872 11.186",
"m011, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 84.012 -352.078 10.151",
"m012, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 140.019 -368.267 10.449",
"m013, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 236.792 -393.460 11.124",
"m014, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 280.669 -285.792 10.547",
"m015, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 210.644 -219.142 9.738",
"m016, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 288.159 -185.873 9.795",
"m017, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 199.624 -112.263 8.955",
"m018, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 105.727 -245.870 9.529",
"m019, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 170.787 -285.223 10.071",
"m020, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 97.016 -299.638 9.877",
"m021, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -295.966 -327.241 8.117",
"m022, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -373.002 0.544 5.649",
"m023, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -322.306 -142.185 6.825",
"m024, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -351.046 150.088 4.845",
"m025, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -181.978 225.617 5.068",
"m026, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -99.004 17.045 6.811",
"m027, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 40.475 -23.112 7.694",
"m028, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -51.179 -87.170 7.636",
"m029, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -88.762 -124.111 7.700",
"m030, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 171.281 113.949 7.278",
"m031, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 246.567 93.756 7.469",
"m032, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 461.275 175.505 7.367",
"m033, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 580.678 863.959 3.600",
"m034, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 357.811 -28.308 8.972",
"m035, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 386.152 -180.894 10.290",
"m036, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 388.257 -290.759 10.812",
"m037, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 380.286 -459.309 12.172",
"m038, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 213.308 -569.080 11.946",
"m039, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 253.748 -592.147 12.441",
"m040, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -26.858 -712.219 11.833",
"m041, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -287.545 -661.678 9.949",
"m042, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -361.714 -460.318 8.497",
"m043, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -629.853 -128.326 5.264",
"m044, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -896.164 600.497 -0.640",
"m045, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -1832.860 266.750 0.108",
"m046, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -1467.341 1751.923 -7.078",
"m047, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -578.296 -517.297 7.615",
"m048, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -2805.653 2686.863 -9.755",
"m049, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -3605.957 436.462 2.696",
"m050, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -2052.336 -843.715 5.338",
"m051, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -850.255 -769.359 7.614",
"m052, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -593.192 -1148.652 10.550",
"m053, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 9.365 -1304.462 15.032",
"m054, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 871.980 -499.812 13.364",
"m055, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 1201.780 96.492 10.023",
"m056, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 1598.403 466.668 6.990",
"m057, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 294.645 3259.915 -10.637",
"m058, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 2805.764 2686.873 -3.660",
"m059, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 3686.427 758.895 11.822",
"m060, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, 3419.683 -1840.478 23.697",
"m061, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -16.409 -2323.779 21.304",
"m062, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -1440.632 -2503.773 21.683",
"m063, -30:42:39.8, 21:26:38.0, 1086.6, 13.5, -3419.585 -1840.480 16.383",
]

assert all(
int(a.split(",")[0][1:]) == i for i, a in enumerate(MEERKAT_ANTENNA_DESCRIPTIONS)
)
88 changes: 88 additions & 0 deletions daskms/experimental/katdal/mock_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from katdal.lazy_indexer import DaskLazyIndexer
from katdal.chunkstore_npy import NpyFileChunkStore
from katdal.dataset import Subarray
from katdal.spectral_window import SpectralWindow
from katdal.vis_flags_weights import ChunkStoreVisFlagsWeights
from katdal.test.test_vis_flags_weights import put_fake_dataset
from katdal.test.test_dataset import MinimalDataSet
from katpoint import Antenna


from daskms.experimental.katdal.meerkat_antennas import MEERKAT_ANTENNA_DESCRIPTIONS

SPW = SpectralWindow(
centre_freq=1284e6, channel_width=0, num_chans=16, sideband=1, bandwidth=856e6
)


class MockDataset(MinimalDataSet):
def __init__(
self,
path,
targets,
timestamps,
antennas=MEERKAT_ANTENNA_DESCRIPTIONS,
spw=SPW,
):
antennas = list(map(Antenna, antennas))
corr_products = [
(a1.name + c1, a2.name + c2)
for i, a1 in enumerate(antennas)
for a2 in antennas[i:]
for c1 in ("h", "v")
for c2 in ("h", "v")
]

subarray = Subarray(antennas, corr_products)
assert len(subarray.ants) > 0

store = NpyFileChunkStore(str(path))
shape = (len(timestamps), spw.num_chans, len(corr_products))
self._test_data, chunk_info = put_fake_dataset(
store,
"cb1",
shape,
chunk_overrides={
"correlator_data": (1, spw.num_chans, len(corr_products)),
"flags": (1, spw.num_chans, len(corr_products)),
"weights": (1, spw.num_chans, len(corr_products)),
},
)
self._vfw = ChunkStoreVisFlagsWeights(store, chunk_info)
self._vis = None
self._weights = None
self._flags = None
super().__init__(targets, timestamps, subarray, spw)

def _set_keep(
self,
time_keep=None,
freq_keep=None,
corrprod_keep=None,
weights_keep=None,
flags_keep=None,
):
super()._set_keep(time_keep, freq_keep, corrprod_keep, weights_keep, flags_keep)
stage1 = (time_keep, freq_keep, corrprod_keep)
self._vis = DaskLazyIndexer(self._vfw.vis, stage1)
self._weights = DaskLazyIndexer(self._vfw.weights, stage1)
self._flags = DaskLazyIndexer(self._vfw.flags, stage1)

@property
def vis(self):
if self._vis is None:
raise ValueError("Selection has not yet been performed")
return self._vis

@property
def flags(self):
if self._flags is None:
raise ValueError("Selection has not yet been performed")
return self._flags

@property
def weights(self):
if self._weights is None:
raise ValueError("Selection has not yet been performed")

return self._weights
Loading

0 comments on commit 32e866b

Please sign in to comment.