Skip to content

Commit

Permalink
start segyio write
Browse files Browse the repository at this point in the history
  • Loading branch information
d-chambers committed Dec 8, 2024
1 parent 8dbbe29 commit be624f5
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 58 deletions.
2 changes: 1 addition & 1 deletion dascore/io/segy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
segy_patch = dc.spool(path)[0]
"""

from .core import SegyV2
from .core import SegyV1_0, SegyV2_0, SegyV2_1
82 changes: 68 additions & 14 deletions dascore/io/segy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@

from __future__ import annotations

import segyio

import dascore as dc
from dascore.io.core import FiberIO
from dascore.utils.io import BinaryReader
from dascore.utils.misc import optional_import

from .utils import _get_attrs, _get_coords, _get_filtered_data_and_coords, _is_segy
from .utils import (
_get_attrs,
_get_coords,
_get_filtered_data_and_coords,
_get_segy_compatible_patch,
_get_segy_version,
_make_time_header_dict,
)


class SegyV2(FiberIO):
"""An IO class supporting version 2 of the SEGY format."""
class SegyV1_0(FiberIO): # noqa
"""An IO class supporting version 1.0 of the SEGY format."""

name = "segy"
preferred_extensions = ("segy", "sgy")
# also specify a version so when version 2 is released you can
# just make another class in the same module named JingleV2.
version = "2"
version = "1.0"

def get_format(self, path, **kwargs) -> tuple[str, str] | bool:
def get_format(self, fp: BinaryReader, **kwargs) -> tuple[str, str] | bool:
"""Make sure input is segy."""
with open(path, "rb") as fp:
return _is_segy(fp)
# try:
# with segyio.open(path, ignore_geometry=True):
# return self.name, self.version
# except Exception:
# return False
return _get_segy_version(fp)

def read(self, path, time=None, channel=None, **kwargs):
"""
Expand All @@ -37,6 +38,7 @@ def read(self, path, time=None, channel=None, **kwargs):
accept kwargs. If the format supports partial reads, these should
be implemented as well.
"""
segyio = optional_import("segyio")
with segyio.open(path, ignore_geometry=True) as fi:
coords = _get_coords(fi)
attrs = _get_attrs(fi, coords, path, self)
Expand All @@ -57,7 +59,59 @@ def scan(self, path, **kwargs) -> list[dc.PatchAttrs]:
from the [dascore.core.attrs](`dascore.core.attrs`) module, or a
format-specific subclass.
"""
segyio = optional_import("segyio")
with segyio.open(path, ignore_geometry=True) as fi:
coords = _get_coords(fi)
attrs = _get_attrs(fi, coords, path, self)
return [attrs]

def write(self, spool, resource, **kwargs):
"""
Create a segy file from length 1 spool.
Based on the example from segyio:
https://github.com/equinor/segyio/blob/master/python/examples/make-file.py
"""
patch = _get_segy_compatible_patch(spool)
time, distance = patch.get_coord("time"), patch.get_coord("distance")
distance_step = distance.step

time_dict = _make_time_header_dict(time)

segyio = optional_import("segyio")
spec = segyio.spec()

spec.sorting = 2
spec.format = 1
spec.samples = [len(time)] * len(distance)
spec.ilines = range(len(distance))
spec.xlines = [1]

with segyio.create(resource, spec) as f:
# This works because we ensure dim order is (distance, time)
for num, data in enumerate(patch.data):
header = dict(time_dict)
header.update(
{
segyio.su.offset: distance_step,
segyio.su.iline: num,
segyio.su.xline: 1,
segyio.su.yline: 1,
}
)
f.header[num] = header
f.trace[num] = data

f.bin.update(tsort=segyio.TraceSortingFormat.INLINE_SORTING)


class SegyV2_0(SegyV1_0): # noqa
"""An IO class supporting version 2.0 of the SEGY format."""

version = "2.0"


class SegyV2_1(SegyV1_0): # noqa
"""An IO class supporting version 2.1 of the SEGY format."""

version = "2.1"
132 changes: 94 additions & 38 deletions dascore/io/segy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

import datetime

# --- Getting format/version
import numpy as np
from segyio import TraceField

# --- Getting format/version
import pandas as pd

import dascore as dc
from dascore import to_float
from dascore.core import get_coord_manager
from dascore.exceptions import InvalidSpoolError, PatchError
from dascore.utils.misc import optional_import
from dascore.utils.patch import check_patch_coords

# Valid data format codes as specified in the SEGY rev1 manual.
VALID_FORMATS = [1, 2, 3, 4, 5, 8]
Expand All @@ -31,42 +36,42 @@ def twos_comp(bytes_):
return val # return positive value as is


def _is_segy(fp):
def _get_segy_version(fp):
"""
Return True if file pointer contains segy formatted data.
Based on ObsPy's implementation writen by Lion Krischer.
https://github.com/obspy/obspy/blob/master/obspy/io/segy/core.py
"""
# # Read 400byte header into byte string.
# fp.seek(3200)
# header = fp.read(400)
# data_trace_count = twos_comp(header[12:14])
# auxiliary_trace_count = twos_comp(header[14:16])
# sample_interval = twos_comp(header[16:18])
# samples_per_trace = twos_comp(header[20:22])
# data_format_code = twos_comp(header[24:26])
# format_number_major = int.from_bytes(header[300:301])
# format_number_minor = int.from_bytes(header[301:302])
# fixed_len_flag = twos_comp(header[302:304])
#
#
# if _format_number not in (0x0000, 0x0100, 0x0010, 0x0001):
# return False
#
# _fixed_length = unpack(fmt, _fixed_length)[0]
# _extended_number = unpack(fmt, _extended_number)[0]
# # Make some sanity checks and return False if they fail.
# if (
# _sample_interval <= 0
# or _samples_per_trace <= 0
# or _number_of_data_traces < 0
# or _number_of_auxiliary_traces < 0
# or _fixed_length < 0
# or _extended_number < 0
# ):
# return False
return True
fp.seek(3200)
header = fp.read(400)
data_trace_count = twos_comp(header[12:14])
auxiliary_trace_count = twos_comp(header[14:16])
sample_interval = twos_comp(header[16:18])
samples_per_trace = twos_comp(header[20:22])
data_format_code = twos_comp(header[24:26])
format_number_major = int.from_bytes(header[300:301])
format_number_minor = int.from_bytes(header[301:302])
fixed_len_flag = twos_comp(header[302:304])

checks = (
# First check that some samples and a sampling rate is defined.
samples_per_trace > 0 and sample_interval > 0,
# Ensure the data sample format code is valid using range in 2,1 standard
1 <= data_format_code <= 16,
# Check version code
format_number_major in {0, 1, 2, 3},
# Sanity checks for other values.
data_trace_count >= 0,
auxiliary_trace_count >= 0,
format_number_minor in {0, 1, 2, 3},
fixed_len_flag in {0, 1},
)
if all(checks):
return "segy", f"{format_number_major}.{format_number_minor}"
else:
return False


def _get_filtered_data_and_coords(segy_fi, coords, time=None, channel=None):
Expand Down Expand Up @@ -109,12 +114,14 @@ def _get_coords(fi):
If a user knows the dx, change from channel to distance using
patch.update_coords after reading
"""
segyio = optional_import("segyio")
trace_field = segyio.TraceField
header_0 = fi.header[0]

# get time array from SEGY headers
starttime = _get_time_from_header(header_0)
dt = dc.to_timedelta64(header_0[TraceField.TRACE_SAMPLE_INTERVAL] / 1000)
ns = header_0[TraceField.TRACE_SAMPLE_COUNT]
dt = dc.to_timedelta64(header_0[trace_field.TRACE_SAMPLE_INTERVAL] / 1000)
ns = header_0[trace_field.TRACE_SAMPLE_COUNT]
time_array = starttime + dt * np.arange(ns)

# Get distance array from SEGY header
Expand All @@ -139,13 +146,62 @@ def _get_attrs(fi, coords, path, file_io):

def _get_time_from_header(header):
"""Creates a datetime64 object from SEGY header date information."""
year = header[TraceField.YearDataRecorded]
julday = header[TraceField.DayOfYear]
hour = header[TraceField.HourOfDay]
minute = header[TraceField.MinuteOfHour]
second = header[TraceField.SecondOfMinute]
segyio = optional_import("segyio")
trace_field = segyio.TraceField

year = header[trace_field.YearDataRecorded]
julday = header[trace_field.DayOfYear]
hour = header[trace_field.HourOfDay]
minute = header[trace_field.MinuteOfHour]
second = header[trace_field.SecondOfMinute]
# make those timedate64
fmt = "%Y.%j.%H.%M.%S"
s = f"{year}.{julday}.{hour}.{minute}.{second}"
time = datetime.datetime.strptime(s, fmt)
return dc.to_datetime64(time)


def _get_segy_compatible_patch(spool):
"""
Get a patch that will be writable as a segy file.
Ensure coords are distance time.
"""
# Ensure we have a single patch with coordinates time and distance.
spool = [spool] if isinstance(spool, dc.Patch) else spool
if len(spool) != 1:
msg = "Can only write a spool with as single patch as segy."
raise InvalidSpoolError(msg)
patch = spool[0]
check_patch_coords(patch, dims=("time", "distance"))
# Ensure there will be no loss in the time sampling.
# segy supports us precision
time_step = dc.to_float(patch.get_coord("time").step)
new_samp = np.round(time_step, 6)
if np.abs(new_samp - time_step).max() > 1e-7:
msg = (
f"The segy format support us precision for temporal sampling. "
f"The input patch has a time step of {time_step} which will result "
"in a loss or precision. Either manually set the time step with "
"patch.update_coords or resample the time axis with patch.resample"
)
raise PatchError(msg)
return patch.transpose("distance", "time")


def _make_time_header_dict(time_coord):
"""Make the time header dict from a time coordinate."""
header = {}
timestamp = pd.Timestamp(dc.to_datetime64(time_coord.min()))
time_step_us = to_float(time_coord.step) * 1_000_000

segyio = optional_import("segyio")
trace_field = segyio.TraceField

header[trace_field.YearDataRecorded] = timestamp.year
header[trace_field.DayOfYear] = timestamp.day_of_year
header[trace_field.HourOfDay] = timestamp.hour
header[trace_field.MinuteOfHour] = timestamp.minute
header[trace_field.SecondOfMinute] = timestamp.second
header[trace_field.TRACE_SAMPLE_INTERVAL] = time_step_us

return header
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ dependencies = [
"tables>=3.7",
"typing_extensions",
"pint",
"segyio",
]

[project.optional-dependencies]
Expand All @@ -64,6 +63,7 @@ extras = [
"findiff",
"obspy",
"numba",
"segyio",
]

docs = [
Expand Down Expand Up @@ -119,7 +119,9 @@ TERRA15__V4 = "dascore.io.terra15.core:Terra15FormatterV4"
TERRA15__V5 = "dascore.io.terra15.core:Terra15FormatterV5"
TERRA15__V6 = "dascore.io.terra15.core:Terra15FormatterV6"
SILIXA_H5__V1 = "dascore.io.silixah5:SilixaH5V1"
SEGY__V2 = "dascore.io.segy.core:SegyV2"
SEGY__V1_0 = "dascore.io.segy.core:SegyV1_0"
SEGY__V2_0 = "dascore.io.segy.core:SegyV2_0"
SEGY__V2_1 = "dascore.io.segy.core:SegyV2_1"
RSF__V1 = "dascore.io.rsf.core:RSFV1"
WAV = "dascore.io.wav.core:WavIO"
XMLBINARY__V1 = "dascore.io.xml_binary.core:XMLBinaryV1"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_io/test_common_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dascore.io.optodas import OptoDASV8
from dascore.io.pickle import PickleIO
from dascore.io.prodml import ProdMLV2_0, ProdMLV2_1
from dascore.io.segy import SegyV2
from dascore.io.segy import SegyV1_0
from dascore.io.sentek import SentekV5
from dascore.io.silixah5 import SilixaH5V1
from dascore.io.tdms import TDMSFormatterV4713
Expand Down Expand Up @@ -75,15 +75,15 @@
Terra15FormatterV5(): ("terra15_v5_test_file.hdf5",),
Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",),
Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",),
SegyV2(): ("conoco_segy_1.sgy",),
SegyV1_0(): ("conoco_segy_1.sgy",),
DASHDF5(): ("PoroTomo_iDAS_1.h5",),
SentekV5(): ("DASDMSShot00_20230328155653619.das",),
}

# This tuple is for fiber io which support a write method and can write
# generic patches. If the patch has to be in some special form, for example
# only flat patches can be written to WAV, don't put it here.
COMMON_IO_WRITE_TESTS = (PickleIO(), DASDAEV1())
COMMON_IO_WRITE_TESTS = (PickleIO(), DASDAEV1(), SegyV1_0())

# Specifies data registry entries which should not be tested.
SKIP_DATA_FILES = {"whale_1.hdf5", "brady_hs_DAS_DTS_coords.csv"}
Expand Down
26 changes: 26 additions & 0 deletions tests/test_io/test_segy/test_segy.py
Original file line number Diff line number Diff line change
@@ -1 +1,27 @@
"""Tests for SEGY format."""

import pytest

from dascore.io.segy.core import SegyV1_0


class TestSegy:
"""Tests for SEGY format."""

@pytest.fixture(scope="class")
def small_file(self, tmp_path_factory):
"""Creates a small file with only a few bytes."""
parent = tmp_path_factory.mktemp("small_file")
path = parent / "test_file.segy"
with path.open("wb") as f:
f.write(b"abd")
return path

def test_small_file(self, small_file):
"""
Ensure a file that is too small to contain segy header doesn't throw
an error.
"""
segy = SegyV1_0()
out = segy.get_format(small_file)
assert out is False # we actually want to make sure its False.

0 comments on commit be624f5

Please sign in to comment.