Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scripts for using the NCEDC data stored on s3 #283

Merged
merged 10 commits into from
Feb 12, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ src/noisepy/seis/_version.py
# Temp locations for tutorials
tutorials/get_started_data/
tutorials/scedc_data/
tutorials/ncedc_data/
tutorials/cli/tmpdata
tutorials/pnw_data

Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/channelcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, xmlpath: str, path_format: str = "{network}_{name}.xml", stor
super().__init__()
self.xmlpath = xmlpath
self.path_format = path_format

self.fs = get_filesystem(xmlpath, storage_options=storage_options)
if not self.fs.exists(self.xmlpath):
raise Exception(f"The XML Station file directory '{xmlpath}' doesn't exist")
Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/noise_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def preprocess_raw(
st[ii].data = np.float32(st[ii].data)
st[ii].data = scipy.signal.detrend(st[ii].data, type="constant")
st[ii].data = scipy.signal.detrend(st[ii].data, type="linear")
st[ii] = st[ii].taper(max_percentage=0.05)

# merge, taper and filter the data
if len(st) > 1:
Expand Down
141 changes: 116 additions & 25 deletions src/noisepy/seis/scedc_s3store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -34,23 +35,19 @@
return filter


class SCEDCS3DataStore(RawDataStore):
class MiniSeedS3DataStore(RawDataStore):
"""
A data store implementation to read from a directory of miniSEED (.ms) files from the SCEDC S3 bucket.
A data store implementation to read from a directory of miniSEED (.ms) files from an S3 bucket.
Every directory is a a day and each .ms file contains the data for a channel.
"""

# TODO: Support reading directly from the S3 bucket

# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_re = re.compile(r".*[0-9]{7}\.ms$", re.IGNORECASE)

def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
file_name_regex: str = None,
storage_options: dict = {},
):
"""
Expand All @@ -61,6 +58,7 @@
if None, all channels are used
"""
super().__init__()
self.file_re = re.compile(file_name_regex, re.IGNORECASE)
self.fs = get_filesystem(path, storage_options=storage_options)
self.channel_catalog = chan_catalog
self.path = path
Expand All @@ -80,12 +78,12 @@

def _load_channels(self, full_path: str, ch_filter: Callable[[Channel], bool]):
tlog = TimeLogger(logger=logger, level=logging.INFO)
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*.ms")) if self.file_re.match(f) is not None]
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*")) if self.file_re.match(f) is not None]
tlog.log(f"Loading {len(msfiles)} files from {full_path}")
for f in msfiles:
timespan = SCEDCS3DataStore._parse_timespan(f)
timespan = self._parse_timespan(f)
self.paths[timespan.start_datetime] = full_path
channel = SCEDCS3DataStore._parse_channel(os.path.basename(f))
channel = self._parse_channel(os.path.basename(f))
if not ch_filter(channel):
continue
key = str(timespan) # DataTimeFrame is not hashable
Expand All @@ -100,7 +98,7 @@
date = date_range.start_datetime + timedelta(days=d)
if self.date_range is None or date not in self.date_range:
continue
date_path = str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"
date_path = self._get_datepath(date)
full_path = fs_join(self.path, date_path)
self._load_channels(full_path, self.ch_filter)

Expand Down Expand Up @@ -128,13 +126,7 @@
def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
self._ensure_channels_loaded(timespan)
# reconstruct the file name from the channel parameters
chan_str = (
f"{chan.station.network}{chan.station.name.ljust(5, '_')}{chan.type.name}"
f"{chan.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
filename = self._get_filename(timespan, chan)
if not self.fs.exists(filename):
logger.warning(f"Could not find file {filename}")
return ChannelData.empty()
Expand All @@ -147,14 +139,43 @@
def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
return self.channel_catalog.get_inventory(timespan, station)

def _parse_timespan(filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))
@abstractmethod
def _get_datepath(self, timespan: datetime) -> str:
pass

Check warning on line 144 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L144

Added line #L144 was not covered by tests

@abstractmethod
def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
pass

Check warning on line 148 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L148

Added line #L148 was not covered by tests

@abstractmethod
def _parse_channel(self, filename: str) -> Channel:
pass

Check warning on line 152 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L152

Added line #L152 was not covered by tests

@abstractmethod
def _parse_timespan(self, filename: str) -> DateTimeRange:
pass

Check warning on line 156 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L156

Added line #L156 was not covered by tests


def _parse_channel(filename: str) -> Channel:
class SCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_name_regex=r".*[0-9]{7}\.ms$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# CIGMR__LHN___2022002
# CE13884HNZ10_2022002
Expand All @@ -167,3 +188,73 @@
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
chan_str = (
f"{channel.station.network}{channel.station.name.ljust(5, '_')}{channel.type.name}"
f"{channel.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
return filename

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"


class NCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(

Check warning on line 222 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L222

Added line #L222 was not covered by tests
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: AAS.NC.EHZ..D.2020.002
file_name_regex=r".*[0-9]{4}.*[0-9]{3}$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# AAS.NC.EHZ..D.2020.002
split_fn = filename.split(".")
network = split_fn[1]
station = split_fn[0]
channel = split_fn[2]
location = split_fn[3]
if len(channel) > 3:
channel = channel[:3]

Check warning on line 241 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L241

Added line #L241 was not covered by tests
return Channel(
ChannelType(channel, location),
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The NCEDC S3 bucket stores files in the form: AAS.NC.EHZ..D.2020.002
year = int(filename[-8:-4])
day = int(filename[-3:])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "." + str(date.timetuple().tm_yday).zfill(3) + "/"

Check warning on line 256 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L256

Added line #L256 was not covered by tests

def _get_filename(self, timespan: DateTimeRange, chan: Channel) -> str:
chan_str = f"{chan.station.name}.{chan.station.network}.{chan.type.name}." f"{chan.station.location}.D"
return fs_join(self.paths[timespan.start_datetime], f"{chan_str}.{timespan.start_datetime.strftime('%Y.%j')}")

Check warning on line 260 in src/noisepy/seis/scedc_s3store.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/scedc_s3store.py#L259-L260

Added lines #L259 - L260 were not covered by tests
35 changes: 35 additions & 0 deletions tests/test_ncedc_s3store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
from datetime import datetime, timezone

import pytest
from datetimerange import DateTimeRange

from noisepy.seis.scedc_s3store import NCEDCS3DataStore

timespan1 = DateTimeRange(datetime(2022, 1, 2, tzinfo=timezone.utc), datetime(2022, 1, 3, tzinfo=timezone.utc))
timespan2 = DateTimeRange(datetime(2021, 2, 3, tzinfo=timezone.utc), datetime(2021, 2, 4, tzinfo=timezone.utc))
timespan3 = DateTimeRange(datetime(2023, 6, 1, tzinfo=timezone.utc), datetime(2023, 6, 2, tzinfo=timezone.utc))
files_dates = [
("AFD.NC.HHZ..D.2022.002", timespan1),
("KCPB.NC.HHN..D.2021.034", timespan2),
("LMC.NC.HHN..D.2023.152", timespan3),
]


@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename2(file: str, expected: DateTimeRange):
assert expected == NCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
(os.path.join(os.path.dirname(__file__), "./data/s3ncedc"), None),
("s3://ncedc-pds/continuous_waveforms/NC/2022/2022.002/", None),
("s3://ncedc-pds/continuous_waveforms/", timespan1),
]


read_channels = [
(NCEDCS3DataStore._parse_channel(None, "AFD.NC.HHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "NSP.NC.EHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "PSN.NC.EHZ..D.2022.002")),
]
8 changes: 4 additions & 4 deletions tests/test_scedc_s3store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename(file: str, expected: DateTimeRange):
assert expected == SCEDCS3DataStore._parse_timespan(file)
assert expected == SCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
Expand All @@ -31,9 +31,9 @@ def test_parsefilename(file: str, expected: DateTimeRange):


read_channels = [
(SCEDCS3DataStore._parse_channel("BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel("CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel("CINCH__LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CINCH__LHZ___2022002.ms")),
]


Expand Down
Loading
Loading