Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low-freq proc #470

Merged
merged 20 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions dascore/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,98 @@ def _ricker(time, delay):
return dc.Patch(data=data, coords=coords, dims=dims)


@register_func(EXAMPLE_PATCHES, key="delta_patch")
def delta_patch(
dim="time",
shape=(10, 200),
time_min="2020-01-01",
time_step=1 / 250,
distance_min=0,
distance_step=1,
patch=None,
):
"""
Create a delta function patch (zeros everywhere except for
a unit value at the center) along the specified dimension.

Parameters
----------
dim : {"time", "distance"}
The dimension along which to place the delta line.
shape : tuple of int
The shape of the data as (distance, time). Defaults to (10, 200).
time_min : str or datetime64
The start time of the patch.
time_step : float
The time step in seconds between samples.
distance_min : float
The minimum distance coordinate.
distance_step : float
The distance step in meters between samples.
"""
if dim not in ["time", "distance"]:
raise ValueError(
"The delta patch is a 2D patch with 'time' and 'distance' dimensions."
)

if patch is None:
dims = ("distance", "time")
dist_len, time_len = shape

# Create a zeroed data array
data = np.zeros((dist_len, time_len))

# Compute the center indices
dist_mid = dist_len // 2
time_mid = time_len // 2

# Create coordinates
time_step_td = to_timedelta64(time_step)
t0 = np.datetime64(time_min)
time_coord = dascore.core.get_coord(
data=t0 + np.arange(time_len) * time_step_td, step=time_step_td, units="s"
)
dist_coord = dascore.core.get_coord(
data=distance_min + np.arange(dist_len) * distance_step,
step=distance_step,
units="m",
)

coords = {"distance": dist_coord, "time": time_coord}
attrs = dict(
time_min=t0,
time_step=time_step_td,
distance_min=distance_min,
distance_step=distance_step,
category="DAS",
network="",
station="",
tag="delta",
time_units="s",
distance_units="m",
)

# Depending on the selected dimension, place a line of ones at the midpoint
if dim == "time":
# unit value at center time index
data[:, time_mid] = 1
delta_patch = dc.Patch(data=data, coords=coords, dims=dims, attrs=attrs)
return delta_patch.select(distance=0, samples=True)
else:
# unit value at center distance index
data[dist_mid, :] = 1
delta_patch = dc.Patch(data=data, coords=coords, dims=dims, attrs=attrs)
return delta_patch.select(time=0, samples=True)
else:
if dim == "time":
patch = patch.select(distance=0, samples=True)
else:
patch = patch.select(time=0, samples=True)
data = np.zeros(patch.data.size)
data[len(data) // 2] = 1
return patch.update(data=data.reshape(patch.shape))


@register_func(EXAMPLE_PATCHES, key="dispersion_event")
def dispersion_event():
"""
Expand Down
183 changes: 183 additions & 0 deletions docs/recipes/low_freq_proc.qmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
---
title: "Low-Frequency Processing"
execute:
eval: false
---

This recipe demonstrates how DASCore can be used to apply low-frequency (LF) processing to a spool of DAS data.


## Get a spool and define parameters
```{python}
## Load libraries and get a spool to work on
import numpy as np

import dascore as dc


# Define path for saving results
output_data_dir = '/path/to/desired/output/directory/'

# Get a spool to work on
sp = dc.get_example_spool().update()

# Sort the spool
sp = sp.sort("time")
# You may also want to sub-select the spool for the desired distance or time samples before proceeding.

# Get the first patch
pa = sp[0]

# Define the target sampling interval (in sec.)
dt = 10
# With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz
cutoff_freq = 1 / (2*dt)

# Safety factor for the low-pass filter to avoid ailiasing
filter_safety_factor = 0.9

# Enter memory size to be dedicated for processing (in MB)
memory_limit_MB = 10_000

# Define a tolerance for determining edge effects (used in next step)
tolerance = 1e-3
```

## Calculate chunk size and determine edge effects

To chunk the spool, first we need to figure out the chunk size based on machine's memory size so we ensure we can load and process patches with no memory issues. Longer chunk size (longer patches) increases computation efficiency.

Notes:

1. The `processing_factor` is required because certain processing routines involve making copies of the data during the processing steps. It should be determined by performing memory profiling on an example dataset for the specific processing routine. For instance, the combination of low-pass filtering and interpolation, discussed in the next section, requires a processing factor of approximately 5.
2. The `memory_safety_factor` is optional and helps prevent getting too close to the memory limit.

```{python}
# Get patch's number of bytes per seconds (based on patch's data type)
pa_bytes_per_second = pa.data.nbytes / pa.seconds
# Define processing factor and safety factor
processing_factor = 5
memory_safety_factor = 1.2

# Calculate memory size required for each second of data to get processed
memory_size_per_second = pa_bytes_per_second * processing_factor * memory_safety_factor
memory_size_per_second_MB = memory_size_per_second / 1e6

# Calculate chunk size that can be loaded (in seconds)
chunk_size = memory_limit_MB / memory_size_per_second_MB
```

Next, we need to determine the extent of artifacts introduced by low-pass filtering at the edges of each patch. To achieve this, we apply LF processing to a delta function patch, which contains a unit value at the center and zeros elsewhere. The distorted edges are then identified based on a defined threshold.

```{python}
# Retrieve a patch of appropriate size for LF processing that fits into memory
pa_chunked_sp = sp.chunk(time=chunk_size)[0]
# Create a delta patch based on new patch size
delta_pa = dc.get_example_patch("delta_patch", dim="time", patch=pa_chunked_sp)

# Apply the low-pass filter on the delta patch
delta_pa_low_passed = delta_pa.pass_filter(time=(None, cutoff_freq * filter_safety_factor))
# Resample the low-passed filter patch
new_time_ax = np.arange(delta_pa.attrs["time_min"], delta_pa.attrs["time_max"], np.timedelta64(dt, "s"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just use patch.resample here?

Copy link
Collaborator Author

@ahmadtourei ahmadtourei Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, they do not provide the same results:

import numpy as np
import dascore as dc

pa = dc.get_example_patch()
dt = 5 

resampled_pa = pa.resample(time=np.timedelta64(dt, 's')) # has one time sample at zero

new_time_ax = np.arange(pa.attrs["time_min"], pa.attrs["time_max"], np.timedelta64(dt, "s"))
interpolated_pa = pa.interpolate(time=new_time_ax) # has two time samples at 0 second and 5 seconds, as it should because the patch has 8 seconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is odd. I think #10 might be related.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't worked with patch.resample() much; do you think this behavior is expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In both cases they are just calling underlying scipy functions so I don't think there is much we can do about it anyway.

delta_pa_lfp = delta_pa_low_passed.interpolate(time=new_time_ax)

# Identify the indices where the absolute value of the data exceeds the threshold
data_abs = np.abs(delta_pa_lfp.data)
threshold = np.max(data_abs) * tolerance
ind = data_abs > threshold
ind_1 = np.where(ind)[1][0]
ind_2 = np.where(ind)[1][-1]

# Get the total duration of the processed delta function patch in seconds
delta_pa_lfp_seconds = (delta_pa_lfp.attrs["time_max"] - delta_pa_lfp.attrs["time_min"]) / np.timedelta64(1, "s")
# Convert the new time axis to absolute seconds, relative to the first timestamp
time_ax_abs = (new_time_ax - new_time_ax[0]) / np.timedelta64(1, "s")
# Center the time axis
time_ax_centered = time_ax_abs - delta_pa_lfp_seconds // 2

# Calculate the maximum of edges in both sides (in seconds) where artifacts are present
edge = max(np.abs(time_ax_centered[ind_1]), np.abs(time_ax_centered[ind_2]))

# Validate the `edge` value to ensure sufficient processing patch size
if np.ceil(edge) >= chunk_size / 2:
raise ValueError(
f"The calculated `edge` value ({edge:.2f} seconds) is greater than half of the processing patch size "
f"({chunk_size:.2f} seconds). To resolve this and increase efficiency, consider one of the following:\n"
"- Increase `memory_size` to allow for a larger processing window.\n"
"- Increase `tolerance` to reduce the sensitivity of artifact detection."
)
```


## Perform low-frequency processing and save results on disk
```{python}
# Helper functions to handle the name of LF processed patches
def _format_time_as_string(timestamp: np.datetime64) -> str:
"""
Converts a datetime64 object to a formatted string suitable for file naming.

Args:
timestamp (np.datetime64): The timestamp to format.

Returns:
str: A formatted string in 'YYYY-MM-DDTHHMMSS.mmm' format,
compatible with Windows file naming.
"""
formatted_time = str(timestamp.astype("datetime64[ms]"))[:21]
return formatted_time.replace(":", "") # Remove colons for Windows compatibility

def generate_file_name(start_time: np.datetime64, end_time: np.datetime64) -> str:
"""
Generates a standardized file name for low-frequency DAS data.

Args:
start_time (np.datetime64): The start time of the data range.
end_time (np.datetime64): The end time of the data range.

Returns:
str: A file name in the format 'LFDAS_<start_time>_<end_time>.h5'.
"""
start_time_str = _format_time_as_string(start_time)
end_time_str = _format_time_as_string(end_time)
return f"LFDAS_{start_time_str}_{end_time_str}.h5"


# First we chunk the spool based on the `chunk_size' and `edge` calculated before.
sp_chunked_overlaped = sp.chunk(time=chunk_size, overlap=2*edge)

# Process each patch in the spool and save the result patch
for patch in sp_chunked_overlap:
# Apply any pre-processing you may need (such as velocity to strain rate transformation, detrending, etc.)
# ...

# Apply the low-pass filter on the delta patch
pa_low_passed = patch.pass_filter(time=(None, cutoff_freq * filter_safety_factor))
# Resample the low-passed filter patch
new_time_ax = np.arange(pa_low_passed.attrs["time_min"], pa_low_passed.attrs["time_max"], np.timedelta64(dt, "s"))
pa_lfp = pa_low_passed.interolate(time=new_time_ax)
# Update processed patch's sampling interval
pa_lfp = pa_lfp.update_attrs(time_step=dt)

# Remove distorted edges from the data at both ends using the calculated `edge` value
pa_lfp_edgeless = pa_lfp.select(time=(edge, -edge), relative=True)

# Save processed patch
pa_lf_name = generate_file_name(pa_lfp_edgeless.attrs["time_min"], pa_lfp_edgeless.attrs["time_max"])
pa_lf_path = output_data_dir + pa_lf_name
pa_lfp_edgeless.io.write(pa_lf_path, "dasdae")
```

## Visualize the results
```{python}
# Create a spool of LF processed results
sp_lf = dc.spool(output_data_dir)

# Merge the spool and create a single patch. May need to sub-select before merging to prevent exceeding the memory limit.
sp_lf_merged = sp_lf.chunk(time=None)
pa_lf_merged = sp_lf_merged[0]

# Visualize the results. Try different scale values for better Visualization.
pa_lf_merged.viz.waterfall(scale=0.1)
```

#### For any questions, please contact Ahmad Tourei: [GitHub Profile](https://github.com/ahmadtourei).
11 changes: 6 additions & 5 deletions docs/recipes/real_time_proc.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,28 @@ execute:
---


This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in near real time for edge computing purposes.
This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in "near" real time for edge computing purposes.


## Load libraries and get a spool

```{python}
# Import libraries
import numpy as np
import os
import time

import dascore as dc
import numpy as np

from dascore.units import s


# Define path for saving results
output_data_dir = '/path/to/desired/output/directory'

# Get a spool to work on
sp = dc.get_example_spool().update().sort("time")
sp = dc.get_example_spool().update()

# Sort the spool
sp = sp.sort("time")
```

## Set real-time processing parameters (if needed)
Expand Down
71 changes: 71 additions & 0 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,74 @@ def test_moveout(self):
distances = patch.get_coord("distance").values
expected_moveout = distances / velocity
assert np.allclose(moveout, expected_moveout)


class TestDeltaPatch:
"""Tests for the delta_patch example."""

@pytest.mark.parametrize("invalid_dim", ["invalid_dimension", "", None, 123])
def test_delta_patch_invalid_dim(self, invalid_dim):
"""
Test that passing an invalid dimension value raises a ValueError.
"""
with pytest.raises(ValueError, match="The delta patch is a 2D patch"):
dc.get_example_patch("delta_patch", dim=invalid_dim)

@pytest.mark.parametrize("dim", ["time", "distance"])
def test_delta_patch_structure(self, dim):
"""Test that the delta_patch returns a Patch with correct structure."""
patch = dc.get_example_patch("delta_patch", dim=dim)
assert isinstance(patch, dc.Patch), "delta_patch should return a Patch instance"

dims = patch.dims
assert (
"time" in dims and "distance" in dims
), "Patch must have 'time' and 'distance' dimensions"
assert len(patch.shape) == 2, "Data should be 2D"

@pytest.mark.parametrize("dim", ["time", "distance"])
def test_delta_patch_delta_location(self, dim):
"""
Ensures the delta is at the center of the chosen dimension and zeros elsewhere.
"""
# The default shape from the function signature: shape=(10, 200)
# If dim="time", we end up with a single (distance=0) trace => shape (200,)
# If dim="distance", we end up with a single (time=0) trace => shape (10,)
patch = dc.get_example_patch("delta_patch", dim=dim)
data = patch.squeeze().data

# The expected midpoint and verify single delta at center
mid_idx = len(data) // 2

assert data[mid_idx] == 1.0, "Expected a unit delta at the center"
# Check all other samples are zero
# Replace the center value with zero and ensure all zeros remain
test_data = np.copy(data)
test_data[mid_idx] = 0
assert np.allclose(
test_data, 0
), "All other samples should be zero except the center"

@pytest.mark.parametrize("dim", ["time", "distance"])
def test_delta_patch_with_patch(self, dim):
"""Test passing an existing patch to delta_patch and ensure delta is applied."""
# Create a base patch
base_patch = dc.get_example_patch("random_das", shape=(5, 50))
# Apply the delta_patch function with the existing patch
delta_applied_patch = dc.get_example_patch(
"delta_patch", dim=dim, patch=base_patch
)

assert isinstance(delta_applied_patch, dc.Patch), "Should return a Patch"
data = delta_applied_patch.squeeze().data

# Ensure data is 1D after operation if that is the intended behavior
assert len(delta_applied_patch.shape) == 2, "Data should be 2D"
mid_idx = len(data) // 2
# Check that only the center value is one and others are zero
assert data[mid_idx] == 1.0, "Center sample should be 1.0"
test_data = np.copy(data)
test_data[mid_idx] = 0
assert np.allclose(
test_data, 0
), "All other samples should be zero except the center"
Loading