Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scripts for using the NCEDC data stored on s3 #283

Merged
merged 10 commits into from
Feb 12, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ src/noisepy/seis/_version.py
# Temp locations for tutorials
tutorials/get_started_data/
tutorials/scedc_data/
tutorials/ncedc_data/
tutorials/cli/tmpdata
tutorials/pnw_data

Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/channelcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, xmlpath: str, path_format: str = "{network}_{name}.xml", stor
super().__init__()
self.xmlpath = xmlpath
self.path_format = path_format

self.fs = get_filesystem(xmlpath, storage_options=storage_options)
if not self.fs.exists(self.xmlpath):
raise Exception(f"The XML Station file directory '{xmlpath}' doesn't exist")
Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/noise_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def preprocess_raw(
st[ii].data = np.float32(st[ii].data)
st[ii].data = scipy.signal.detrend(st[ii].data, type="constant")
st[ii].data = scipy.signal.detrend(st[ii].data, type="linear")
st[ii] = st[ii].taper(max_percentage=0.05)

# merge, taper and filter the data
if len(st) > 1:
Expand Down
141 changes: 116 additions & 25 deletions src/noisepy/seis/scedc_s3store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -34,23 +35,19 @@ def filter(ch: Channel) -> bool:
return filter


class SCEDCS3DataStore(RawDataStore):
class MiniSeedS3DataStore(RawDataStore):
"""
A data store implementation to read from a directory of miniSEED (.ms) files from the SCEDC S3 bucket.
A data store implementation to read from a directory of miniSEED (.ms) files from an S3 bucket.
Every directory is a a day and each .ms file contains the data for a channel.
"""

# TODO: Support reading directly from the S3 bucket

# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_re = re.compile(r".*[0-9]{7}\.ms$", re.IGNORECASE)

def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
file_name_regex: str = None,
storage_options: dict = {},
):
"""
Expand All @@ -61,6 +58,7 @@ def __init__(
if None, all channels are used
"""
super().__init__()
self.file_re = re.compile(file_name_regex, re.IGNORECASE)
self.fs = get_filesystem(path, storage_options=storage_options)
self.channel_catalog = chan_catalog
self.path = path
Expand All @@ -80,12 +78,12 @@ def __init__(

def _load_channels(self, full_path: str, ch_filter: Callable[[Channel], bool]):
tlog = TimeLogger(logger=logger, level=logging.INFO)
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*.ms")) if self.file_re.match(f) is not None]
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*")) if self.file_re.match(f) is not None]
tlog.log(f"Loading {len(msfiles)} files from {full_path}")
for f in msfiles:
timespan = SCEDCS3DataStore._parse_timespan(f)
timespan = self._parse_timespan(f)
self.paths[timespan.start_datetime] = full_path
channel = SCEDCS3DataStore._parse_channel(os.path.basename(f))
channel = self._parse_channel(os.path.basename(f))
if not ch_filter(channel):
continue
key = str(timespan) # DataTimeFrame is not hashable
Expand All @@ -100,7 +98,7 @@ def _ensure_channels_loaded(self, date_range: DateTimeRange):
date = date_range.start_datetime + timedelta(days=d)
if self.date_range is None or date not in self.date_range:
continue
date_path = str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"
date_path = self._get_datepath(date)
full_path = fs_join(self.path, date_path)
self._load_channels(full_path, self.ch_filter)

Expand Down Expand Up @@ -128,13 +126,7 @@ def get_timespans(self) -> List[DateTimeRange]:
def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
self._ensure_channels_loaded(timespan)
# reconstruct the file name from the channel parameters
chan_str = (
f"{chan.station.network}{chan.station.name.ljust(5, '_')}{chan.type.name}"
f"{chan.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
filename = self._get_filename(timespan, chan)
if not self.fs.exists(filename):
logger.warning(f"Could not find file {filename}")
return ChannelData.empty()
Expand All @@ -147,14 +139,43 @@ def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
return self.channel_catalog.get_inventory(timespan, station)

def _parse_timespan(filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))
@abstractmethod
def _get_datepath(self, timespan: datetime) -> str:
pass

@abstractmethod
def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
pass

@abstractmethod
def _parse_channel(self, filename: str) -> Channel:
pass

@abstractmethod
def _parse_timespan(self, filename: str) -> DateTimeRange:
pass


def _parse_channel(filename: str) -> Channel:
class SCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_name_regex=r".*[0-9]{7}\.ms$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# CIGMR__LHN___2022002
# CE13884HNZ10_2022002
Expand All @@ -167,3 +188,73 @@ def _parse_channel(filename: str) -> Channel:
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
chan_str = (
f"{channel.station.network}{channel.station.name.ljust(5, '_')}{channel.type.name}"
f"{channel.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
return filename

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"


class NCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: AAS.NC.EHZ..D.2020.002
file_name_regex=r".*[0-9]{4}.*[0-9]{3}$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# AAS.NC.EHZ..D.2020.002
split_fn = filename.split(".")
network = split_fn[1]
station = split_fn[0]
channel = split_fn[2]
location = split_fn[3]
if len(channel) > 3:
channel = channel[:3]
return Channel(
ChannelType(channel, location),
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The NCEDC S3 bucket stores files in the form: AAS.NC.EHZ..D.2020.002
year = int(filename[-8:-4])
day = int(filename[-3:])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "." + str(date.timetuple().tm_yday).zfill(3) + "/"

def _get_filename(self, timespan: DateTimeRange, chan: Channel) -> str:
chan_str = f"{chan.station.name}.{chan.station.network}.{chan.type.name}." f"{chan.station.location}.D"
return fs_join(self.paths[timespan.start_datetime], f"{chan_str}.{timespan.start_datetime.strftime('%Y.%j')}")
Binary file added tests/data/s3ncedc/AFD.NC.HHZ..D.2022.002
Binary file not shown.
Binary file added tests/data/s3ncedc/NSP.NC.EHZ..D.2022.002
Binary file not shown.
Binary file added tests/data/s3ncedc/PSN.NC.EHZ..D.2022.002
Binary file not shown.
80 changes: 80 additions & 0 deletions tests/test_ncedc_s3store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
from datetime import datetime, timezone

import pytest
from datetimerange import DateTimeRange
from test_channelcatalog import MockCatalog

from noisepy.seis.datatypes import Channel, ChannelType, Station
from noisepy.seis.scedc_s3store import NCEDCS3DataStore, channel_filter

timespan1 = DateTimeRange(datetime(2022, 1, 2, tzinfo=timezone.utc), datetime(2022, 1, 3, tzinfo=timezone.utc))
timespan2 = DateTimeRange(datetime(2021, 2, 3, tzinfo=timezone.utc), datetime(2021, 2, 4, tzinfo=timezone.utc))
timespan3 = DateTimeRange(datetime(2023, 6, 1, tzinfo=timezone.utc), datetime(2023, 6, 2, tzinfo=timezone.utc))
files_dates = [
("AFD.NC.HHZ..D.2022.002", timespan1),
("KCPB.NC.HHN..D.2021.034", timespan2),
("LMC.NC.HHN..D.2023.152", timespan3),
]


@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename2(file: str, expected: DateTimeRange):
assert expected == NCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
(os.path.join(os.path.dirname(__file__), "./data/s3ncedc"), None),
("s3://ncedc-pds/continuous_waveforms/NC/2022/2022.002/", None),
("s3://ncedc-pds/continuous_waveforms/", timespan1),
]


read_channels = [
(NCEDCS3DataStore._parse_channel(None, "AFD.NC.HHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "NSP.NC.EHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "PSN.NC.EHZ..D.2022.002")),
]


@pytest.fixture(params=data_paths)
def store(request):
storage_options = {"s3": {"anon": True}}
(path, timespan) = request.param
return NCEDCS3DataStore(path, MockCatalog(), lambda ch: ch in read_channels, timespan, storage_options)


@pytest.mark.parametrize("channel", read_channels)
def test_read(store: NCEDCS3DataStore, channel: str):
chdata = store.read_data(timespan1, channel)
assert chdata.sampling_rate == 1.0
assert chdata.start_timestamp >= timespan1.start_datetime.timestamp()
assert chdata.start_timestamp < timespan1.end_datetime.timestamp()
assert chdata.data.size > 0


def test_timespan_channels(store: NCEDCS3DataStore):
timespans = store.get_timespans()
assert len(timespans) == 1
assert timespans[0] == timespan1
channels = store.get_channels(timespan1)
assert len(channels) == len(read_channels)
channels = store.get_channels(timespan2)
assert len(channels) == 0


def test_filter():
# filter for station 'staX' or 'staY' and channel type starts with 'B'
f = channel_filter(["staX", "staY"], "B")
staX = Station("NC", "staX")
staZ = Station("NC", "staZ")

def check(sta, ch_name):
ch = Channel(ChannelType((ch_name)), sta)
return f(ch)

assert check(staX, "BHE") is True
assert check(staX, "BBB") is True
assert check(staX, "CHE") is False # invalid channel name
assert check(staZ, "BHE") is False # invalid station
assert check(staZ, "CHE") is False # invalid station and channel name
8 changes: 4 additions & 4 deletions tests/test_scedc_s3store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename(file: str, expected: DateTimeRange):
assert expected == SCEDCS3DataStore._parse_timespan(file)
assert expected == SCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
Expand All @@ -31,9 +31,9 @@ def test_parsefilename(file: str, expected: DateTimeRange):


read_channels = [
(SCEDCS3DataStore._parse_channel("BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel("CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel("CINCH__LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CINCH__LHZ___2022002.ms")),
]


Expand Down
Loading
Loading