Skip to content

Commit

Permalink
work on improving aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
d-chambers committed Mar 26, 2024
1 parent 2da1e43 commit f3c902c
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 29 deletions.
11 changes: 10 additions & 1 deletion dascore/core/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,21 @@ def iselect(self, *args, **kwargs):
median_filter = dascore.proc.median_filter
savgol_filter = dascore.proc.savgol_filter
gaussian_filter = dascore.proc.gaussian_filter
aggregate = dascore.proc.aggregate
abs = dascore.proc.abs
real = dascore.proc.real
imag = dascore.proc.imag
angle = dascore.proc.angle
resample = dascore.proc.resample
# Add aggregations
aggregate = dascore.proc.agg.aggregate
min = dascore.proc.agg.min
max = dascore.proc.agg.max
mean = dascore.proc.agg.mean
median = dascore.proc.agg.median
std = dascore.proc.agg.std
first = dascore.proc.agg.first
last = dascore.proc.agg.last
sum = dascore.proc.agg.sum

def iresample(self, *args, **kwargs):
"""Deprecated method."""
Expand Down
3 changes: 2 additions & 1 deletion dascore/proc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
Module containing patch processing routines.
"""
from __future__ import annotations
from .aggregate import aggregate

import dascore.proc.aggregate as agg
from .basic import * # noqa
from .coords import * # noqa
from .correlate import correlate
Expand Down
238 changes: 212 additions & 26 deletions dascore/proc/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""Module for applying aggregations along a specified axis."""
from __future__ import annotations

from collections.abc import Callable
from functools import partial
from typing import Literal

import numpy as np

import dascore.core
from dascore.constants import PatchType
from dascore.utils.docs import compose_docstring
from dascore.utils.misc import iterate
from dascore.utils.patch import patch_function
from dascore.utils.time import to_datetime64
from dascore.utils.time import is_datetime64, to_datetime64, to_float


def _take_first(data, axis):
Expand All @@ -17,45 +21,227 @@ def _take_first(data, axis):


_AGG_FUNCS = {
"mean": np.mean,
"median": np.median,
"min": np.min,
"max": np.max,
"sum": np.sum,
"mean": np.nanmean,
"median": np.nanmedian,
"min": np.nanmin,
"max": np.nanmax,
"sum": np.nansum,
"std": np.nanstd,
"first": partial(np.take, indices=0),
"last": partial(np.take, indices=-1),
}

AGG_DOC_STR = """
patch
The input Patch.
dim
The dimension along which aggregations are to be performed.
keep_dims
If True, keep the dimension(s) specified by dims argument. Otherwise,
the aggregated dimension(s) will be removed.
"""


@patch_function()
@compose_docstring(params=AGG_DOC_STR, options=list(_AGG_FUNCS))
def aggregate(
patch: PatchType,
dim: str,
method: Literal["mean", "median", "min", "max", "first", "last"] = "mean",
dim: str | None = None,
keep_dims: bool = False,
method: Literal["mean", "median", "min", "max", "first", "last"]
| Callable = "mean",
) -> PatchType:
"""
Aggregate values along a specified dimension.
Parameters
----------
dim
The dimension along which aggregations are to be performed.
{params}
method
The aggregation to apply along dimension. Options are:
mean, min, max, median, first, last
"""
axis = patch.dims.index(dim)
func = _AGG_FUNCS[method]
new_data = np.expand_dims(func(patch.data, axis=axis), axis)
# update coords with new dimension (use mean)
coords = patch.coords
if dim == "time": # need to convert time to ints
ns = patch.coords.get_array(dim).astype(np.int64) / 1_000_000_000
new_coord_val = to_datetime64(np.mean(ns))
{options}
Examples
--------
>>> import dascore as dc
>>> patch = dc.get_example_patch()
>>> # Calculate mean along time axis, keep same shape of patch.
>>> patch_time = patch.mean("time", keep_dims=True)
>>> # Calculate median distance, discard aggregated dimensions.
>>> patch_dist = patch.median("distance", keep_dims=False)
"""
func = _AGG_FUNCS.get(method, method)
if keep_dims:
out = _aggregate_keep_dims(patch, dim, func)
else:
new_coord_val = np.mean(patch.coords.get_array(dim))
new_coords = {
name: coords.get_array(name) if name != dim else np.array([new_coord_val])
for name in patch.dims
}
return patch.new(data=new_data, coords=new_coords)
out = _aggregate_reduce_dims(patch, dim, func)
return out


def _aggregate_keep_dims(patch, dims, func):
"""Aggregate while keeping dimensions."""
data, coords = patch.data, patch.coords
for dim in iterate(patch.dims if dims is None else dims):
axis = patch.dims.index(dim)
data = np.expand_dims(func(data, axis=axis), axis)
coord_array = coords.get_array(dim)
# Need to account for taking mean of datetime arrays.
if is_datetime64(coord_array):
ns = to_float(patch.coords.get_array(dim))
coord = dascore.core.get_coord(values=[to_datetime64(np.mean(ns))])
else:
coord = dascore.core.get_coord(values=[np.mean(coord_array)])
coords = coords.update(**{dim: coord})
return patch.new(data=data, coords=coords)


def _aggregate_reduce_dims(patch, dims, func):
"""Remove dimensions from the patch while aggregating."""
for dim in iterate(patch.dims if dims is None else dims):
axis = patch.dims.index(dim)
data = func(patch.data, axis=axis)
# In this case we have reduced all the dimensions. Just return scalar.
if not isinstance(data, np.ndarray):
return data
coords = patch.coords.update(**{dim: None})
patch = patch.new(data=data, coords=coords)
return patch


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def min(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Calculate the minimum along one or more dimensions.
Parameters
----------
{params}
"""
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=np.nanmean)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def max(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Calculate the maximum along one or more dimensions.
Parameters
----------
{params}
"""
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=np.nanmax)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def mean(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Calculate the mean along one or more dimensions.
Parameters
----------
{params}
"""
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=np.nanmean)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def median(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Calculate the median along one or more dimensions.
Parameters
----------
{params}
"""
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=np.nanmedian)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def std(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Calculate the standard deviation along one or more dimensions.
Parameters
----------
{params}
"""
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=np.nanstd)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def first(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Get the first value along one or more dimensions.
Parameters
----------
{params}
"""
func = _AGG_FUNCS["first"]
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=func)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def last(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Get the last value along one or more dimensions.
Parameters
----------
{params}
"""
func = _AGG_FUNCS["last"]
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=func)


@patch_function()
@compose_docstring(params=AGG_DOC_STR)
def sum(
patch: PatchType,
dim: str | None = None,
keep_dims: bool = False,
) -> PatchType:
"""
Sum the values along one or more dimensions.
Parameters
----------
{params}
"""
func = _AGG_FUNCS["sum"]
return aggregate.func(patch, dim=dim, keep_dims=keep_dims, method=func)
27 changes: 26 additions & 1 deletion tests/test_proc/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import pytest

import dascore as dc
from dascore.proc.aggregate import _AGG_FUNCS, aggregate


Expand All @@ -14,7 +15,7 @@ class TestBasicAggregations:
def distance_aggregated_patch(self, request, random_patch):
"""Apply all supported aggregations along distance axis."""
agg = request.param
return aggregate(random_patch, dim="distance", method=agg)
return aggregate(random_patch, dim="distance", method=agg, keep_dims=True)

def test_dimension_collapsed(self, distance_aggregated_patch):
"""Ensure the aggregate dimension was collapsed to len 1."""
Expand Down Expand Up @@ -43,3 +44,27 @@ def test_last(self, random_patch):
axis = random_patch.dims.index("distance")
assert out.data.shape[axis] == 1
assert np.allclose(random_patch.data[-1, :], out.data[0, :])

def test_no_dim(self, random_patch):
"""Ensure no dimension argument behaves like numpy."""
out = random_patch.aggregate(method="mean", keep_dims=True)
assert np.all(np.mean(random_patch.data) == out.data)
# now test without keeping dims
out = random_patch.aggregate(method="mean", keep_dims=False)
assert out == np.mean(random_patch.data)

@pytest.mark.parametrize("method", list(_AGG_FUNCS))
def test_named_aggregations(self, random_patch, method):
"""Simply run the named aggregations."""
patch1 = getattr(random_patch, method)(dim="time", keep_dims=True)
patch2 = getattr(random_patch, method)(dim="distance", keep_dims=False)
assert isinstance(patch1, dc.Patch)
assert isinstance(patch2, dc.Patch)


class TestApplyOperators:
"""Ensure aggregated patches can be used as operators for arithmetic."""

def test_complete_reduction(self, random_patch):
"""Ensure a patch with complete reduction works."""
assert False

0 comments on commit f3c902c

Please sign in to comment.