Skip to content

Commit

Permalink
Merge pull request #63 from ratt-ru/b040
Browse files Browse the repository at this point in the history
R0.4.0
  • Loading branch information
o-smirnov authored Jun 19, 2020
2 parents 12233f5 + 4a6149b commit d2dc1f2
Show file tree
Hide file tree
Showing 11 changed files with 1,144 additions and 494 deletions.
138 changes: 138 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
requirements = [
"dask-ms[xarray]",
"dask[complete]",
"datashader",
"datashader @ git+https://github.com/o-smirnov/datashader.git",
"holoviews",
"matplotlib>2.2.3; python_version >= '3.5'",
"cmasher",
"future-fstrings",
"requests",
"MSUtils"
Expand All @@ -17,7 +18,7 @@
extras_require = {'testing': ['pytest', 'pytest-flake8']}

PACKAGE_NAME = 'shadems'
__version__ = '0.3.0'
__version__ = '0.4.0'

setup(name = PACKAGE_NAME,
version = __version__,
Expand Down
175 changes: 136 additions & 39 deletions shade_ms/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ def start_ends(chunks):
s = e


def _create_dataframe(arrays, start, end):
def _create_dataframe(arrays, start, end, columns):
index = None if start is None else np.arange(start, end)

return pd.DataFrame({'x': arrays[0].ravel(),
'y': arrays[1].ravel()},
return pd.DataFrame({k: a.ravel() for k, a in zip(columns, arrays)},
index=index)


def dataframe_factory(out_ind, x, x_ind, y, y_ind, columns=None):
def dataframe_factory(out_ind, *arginds, columns=None):
"""
Creates a dask Dataframe by broadcasting the ``x`` and ``y`` arrays
Creates a dask Dataframe by broadcasting *arginds
against each other and then ravelling them.
.. code-block:: python
Expand All @@ -43,57 +41,157 @@ def dataframe_factory(out_ind, x, x_ind, y, y_ind, columns=None):
out_ind : sequence
Output dimensions.
e.g. :code:`(row, chan)`
x : :class:`dask.array.Array`
x data
x_ind : sequence
x dimensions. e.g. :code:`(row,)`
y : :class:`dask.array.Array`
y data
y_ind : sequence
y dimensions. e.g. :code:(row,)`
*arginds : Sequence of (:class:`dask.array.Array`, index)
document me
columns : sequence, optional
Dataframe column names.
Defaults to :code:`[x, y]`
"""
if not len(arginds) % 2 == 0:
raise ValueError("Must supply an index for each argument")

args = arginds[::2]
inds = arginds[1::2]

if columns is None:
columns = ['x', 'y']
columns = ['x', 'y'] + ["c%d" % i for i in range(len(args) - 2)]
else:
if not isinstance(columns, (tuple, list)) and len(columns) != 2:
raise ValueError("Columns must be a tuple/list "
"of two column names")
if (not isinstance(columns, (tuple, list)) and
len(columns) != len(args)):

if not all(i in out_ind for i in x_ind):
raise ValueError("x_ind dimensions not in out_ind")
raise ValueError("Columns must be a tuple/list of columns "
"matching the number of arrays")

if not all(i in out_ind for i in y_ind):
raise ValueError("y_ind dimensions not in out_ind")
have_nan_chunks = False

if not len(x_ind) == x.ndim:
raise ValueError("len(x_ind) != x.ndim")
new_args = []

if not len(y_ind) == y.ndim:
raise ValueError("len(y_ind) != y.ndim")
for a, (arg, ind) in enumerate(zip(args, inds)):
if not all(i in out_ind for i in ind):
raise ValueError("Argument %d dimensions not in out_ind" % a)

have_nan_chunks = (any(np.isnan(c) for dc in x.chunks for c in dc) or
any(np.isnan(c) for dc in y.chunks for c in dc))
if not len(ind) == arg.ndim:
raise ValueError("Argument %d len(ind) != arg.ndim" % a)

# Generate slicing tuples that will expand x and y up to the full
# resolution
expand_x = tuple(slice(None) if i in x_ind else None for i in out_ind)
expand_y = tuple(slice(None) if i in y_ind else None for i in out_ind)
have_nan_chunks = (any(np.isnan(c) for dc in arg.chunks for c in dc) or
have_nan_chunks)

bx = x[expand_x]
by = y[expand_y]
# Generate slicing tuple that will expand arg up to full resolution
expand = tuple(slice(None) if i in ind else None for i in out_ind)
new_args.append(arg[expand])

# Create meta data so that blockwise doesn't call
# np.broadcast_arrays and fall over on the tuple
# of arrays that it returns
dtype = np.result_type(x, y)
dtype = np.result_type(*args)
meta = np.empty((0,) * len(out_ind), dtype=dtype)

blockargs = (v for pair in ((a, out_ind) for a in new_args) for v in pair)

bcast = da.blockwise(np.broadcast_arrays, out_ind,
bx, out_ind,
by, out_ind,
*blockargs,
subok=True,
align_arrays=not have_nan_chunks,
meta=meta,
dtype=dtype)

# Now create a dataframe from the broadcasted arrays
# with lower-level dask graph API

# Flattened list of broadcast array keys
# We'll use this to generate a 1D (ravelled) dataframe
keys = product((bcast.name,), *(range(b) for b in bcast.numblocks))
name = "dataframe-" + tokenize(bcast)

# dictionary defining the graph for this part of the operation
layers = {}

if have_nan_chunks:
# We can't create proper indices if we don't known our chunk sizes
divisions = [None]

for i, key in enumerate(keys):
layers[(name, i)] = (_create_dataframe, key, None, None, columns)
divisions.append(None)
else:
# We do know all our chunk sizes, create reasonable dataframe indices
start_idx = 0
divisions = [0]

expr = ((e - s for s, e in start_ends(dim_chunks))
for dim_chunks in bcast.chunks)
chunk_sizes = (reduce(mul, shape, 1) for shape in product(*expr))
chunk_ranges = start_ends(chunk_sizes)

for i, (key, (start, end)) in enumerate(zip(keys, chunk_ranges)):
layers[(name, i)] = (_create_dataframe, key, start, end, columns)
start_idx += end - start
divisions.append(start_idx)

assert len(layers) == bcast.npartitions
assert len(divisions) == bcast.npartitions + 1

# Create the HighLevelGraph
graph = HighLevelGraph.from_collections(name, layers, [bcast])
# Metadata representing the broadcasted and ravelled data

meta = pd.DataFrame(data={k: np.empty((0,), dtype=a.dtype)
for k, a in zip(columns, args)},
columns=columns)

# Create the actual Dataframe
return dd.DataFrame(graph, name, meta=meta, divisions=divisions)


def multicol_dataframe_factory(out_ind, arrays, array_dims):
"""
Creates a dask Dataframe by broadcasting arrays (given by the arrays dict-like object)
against each other and then ravelling them. The array_indices mapping specifies which indices
the arrays have
.. code-block:: python
df = dataframe_factory(("row", "chan"), {'x': x, 'y': y}, {x: ("row",), y: ("chan",)})
Parameters
----------
out_ind : sequence
Output dimensions.
e.g. :code:`(row, chan)`
"""
columns = list(arrays.keys())

have_nan_chunks = None
expand = {}
barr = {}
# build up list of arguments for blockwise call below
blockwise_args = [np.broadcast_arrays, out_ind]

for col, arr in arrays.items():
if col not in array_dims:
raise ValueError(f"{col} dimensions not specified")
arr_ind = array_dims[col]
if not all(i in out_ind for i in arr_ind):
raise ValueError(f"{col} dimensions not in out_ind")
if not len(arr_ind) == arr.ndim:
raise ValueError(f"len({col}_ind) != {col}.ndim")
have_nan_chunks = have_nan_chunks or any(np.isnan(c) for dc in arr.chunks for c in dc)

# Generate slicing tuples that will expand arr up to the full
# resolution
expand[col] = tuple(slice(None) if i in arr_ind else None for i in out_ind)
# broadcast vesion of array
barr[col] = arr[expand[col]]

blockwise_args += [barr[col], out_ind]

# Create meta data so that blockwise doesn't call
# np.broadcast_arrays and fall over on the tuple
# of arrays that it returns
dtype = np.result_type(*arrays.values())
meta = np.empty((0,) * len(out_ind), dtype=dtype)

bcast = da.blockwise(*blockwise_args,
align_arrays=not have_nan_chunks,
meta=meta,
dtype=dtype)
Expand Down Expand Up @@ -137,8 +235,7 @@ def dataframe_factory(out_ind, x, x_ind, y, y_ind, columns=None):
# Create the HighLevelGraph
graph = HighLevelGraph.from_collections(name, layers, [bcast])
# Metadata representing the broadcasted and ravelled data
meta = pd.DataFrame(data={'x': np.empty((0,), dtype=x.dtype),
'y': np.empty((0,), dtype=y.dtype)},
meta = pd.DataFrame(data={col: np.empty((0,), dtype=arr.dtype) for col, arr in arrays.items()},
columns=columns)

# Create the actual Dataframe
Expand Down
Loading

0 comments on commit d2dc1f2

Please sign in to comment.