Skip to content

Commit

Permalink
Add dask-ms katdal import click application
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Mar 28, 2024
1 parent e0ecb9c commit de4afcc
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 10 deletions.
4 changes: 3 additions & 1 deletion daskms/apps/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import click

from daskms.apps.convert import convert
from daskms.apps.katdal_import import katdal


@click.group()
@click.group(name="dask-ms")
@click.pass_context
@click.option("--debug/--no-debug", default=False)
def main(ctx, debug):
Expand All @@ -15,3 +16,4 @@ def main(ctx, debug):


main.add_command(convert)
main.add_command(katdal)
67 changes: 67 additions & 0 deletions daskms/apps/katdal_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import click


@click.group()
@click.pass_context
def katdal(ctx):
"""subgroup for katdal commands"""
pass


class PolarisationListType(click.ParamType):
name = "polarisation list"
VALID = {"HH", "HV", "VH", "VV"}

def convert(self, value, param, ctx):
if isinstance(value, str):
value = [p.strip() for p in value.split(",")]
else:
raise TypeError(
f"{value} should be a comma separated string of polarisations"
)

if not set(value).issubset(self.VALID):
raise ValueError(f"{set(value)} is not a subset of {self.VALID}")

return value


@katdal.command(name="import")
@click.pass_context
@click.argument("rdb_url", required=True)
@click.option(
"-a",
"--no-auto",
flag_value=True,
default=False,
help="Exclude auto-correlation data",
)
@click.option(
"-o",
"--output-store",
help="Output store name. Will be derived from the rdb url if not provided.",
default=None,
)
@click.option(
"-p",
"--pols-to-use",
default="HH,HV,VH,VV",
help="Select polarisation products to include in MS as "
"a comma-separated list, containing values from [HH, HV, VH, VV].",
type=PolarisationListType(),
)
@click.option(
"--applycal",
default="",
help="List of calibration solutions to apply to data as "
"a string of comma-separated names, e.g. 'l1' or "
"'K,B,G'. Use 'default' for L1 + L2 and 'all' for "
"all available products.",
)
def _import(ctx, rdb_url, no_auto, pols_to_use, applycal, output_store):
"""Export an observation in the SARAO archive to zarr formation
RDB_URL is the SARAO archive link"""
from daskms.experimental.katdal import katdal_import

katdal_import(rdb_url, output_store, no_auto, applycal)
25 changes: 22 additions & 3 deletions daskms/experimental/katdal/katdal_import.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import urllib

import dask
import katdal
from katdal.dataset import DataSet
Expand All @@ -6,17 +9,33 @@
from daskms.experimental.zarr import xds_to_zarr


def katdal_import(url: str, out_store: str, auto_corrs: bool = True):
def default_output_name(url):
url_parts = urllib.parse.urlparse(url, scheme="file")
# Create zarr dataset in current working directory (strip off directories)
dataset_filename = os.path.basename(url_parts.path)
# Get rid of the ".full" bit on RDB files (it's the same dataset)
full_rdb_ext = ".full.rdb"
if dataset_filename.endswith(full_rdb_ext):
dataset_basename = dataset_filename[: -len(full_rdb_ext)]
else:
dataset_basename = os.path.splitext(dataset_filename)[0]
return f"{dataset_basename}.zarr"


def katdal_import(url: str, out_store: str, no_auto: bool, applycal: str):
if isinstance(url, str):
dataset = katdal.open(url)
dataset = katdal.open(url, appycal=applycal)
elif isinstance(url, DataSet):
dataset = url
else:
raise TypeError(f"{url} must be a string or a katdal DataSet")

facade = XarrayMSV2Facade(dataset, auto_corrs=auto_corrs)
facade = XarrayMSV2Facade(dataset, no_auto=no_auto)
main_xds, subtable_xds = facade.xarray_datasets()

if not out_store:
out_store = default_output_name(url)

writes = [
xds_to_zarr(main_xds, out_store),
*(xds_to_zarr(ds, f"{out_store}::{k}") for k, ds in subtable_xds.items()),
Expand Down
8 changes: 3 additions & 5 deletions daskms/experimental/katdal/msv2_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,14 @@ def to_mjds(timestamp: Timestamp):
class XarrayMSV2Facade:
"""Provides a simplified xarray Dataset view over a katdal dataset"""

def __init__(
self, dataset: DataSet, auto_corrs: bool = True, row_view: bool = True
):
def __init__(self, dataset: DataSet, no_auto: bool = True, row_view: bool = True):
self._dataset = dataset
self._auto_corrs = auto_corrs
self._no_auto = no_auto
self._row_view = row_view
self._pols_to_use = ["HH", "HV", "VH", "VV"]
# Reset the dataset selection
self._dataset.select(reset="")
self._cp_info = corrprod_index(dataset, self._pols_to_use, auto_corrs)
self._cp_info = corrprod_index(dataset, self._pols_to_use, not no_auto)

@property
def cp_info(self):
Expand Down
2 changes: 1 addition & 1 deletion daskms/experimental/katdal/tests/test_chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@pytest.mark.parametrize("row_dim", [True, False])
@pytest.mark.parametrize("out_store", ["output.zarr"])
def test_chunkstore(tmp_path_factory, dataset, auto_corrs, row_dim, out_store):
facade = XarrayMSV2Facade(dataset, auto_corrs, row_dim)
facade = XarrayMSV2Facade(dataset, not auto_corrs, row_dim)
xds, sub_xds = facade.xarray_datasets()

# Reintroduce the shutil.rmtree and remote the tmp_path_factory
Expand Down

0 comments on commit de4afcc

Please sign in to comment.