From 1e91fabbfddc9c7527d3bf8633902e5218440328 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Tue, 9 Sep 2025 15:34:34 +0000 Subject: [PATCH 01/23] Reimplement disaster recovery logic --- src/mdio/builder/schemas/dtype.py | 1 + src/mdio/constants.py | 1 + src/mdio/converters/segy.py | 62 +++++++++++++++++++++++++++ src/mdio/converters/type_converter.py | 2 + src/mdio/segy/_workers.py | 13 +++++- 5 files changed, 78 insertions(+), 1 deletion(-) diff --git a/src/mdio/builder/schemas/dtype.py b/src/mdio/builder/schemas/dtype.py index 8e8cbcd35..caeb6c3be 100644 --- a/src/mdio/builder/schemas/dtype.py +++ b/src/mdio/builder/schemas/dtype.py @@ -32,6 +32,7 @@ class ScalarType(StrEnum): COMPLEX64 = "complex64" COMPLEX128 = "complex128" COMPLEX256 = "complex256" + HEADERS_V3 = "r1920" # Raw number of BITS, must be a multiple of 8 class StructuredField(CamelCaseStrictModel): diff --git a/src/mdio/constants.py b/src/mdio/constants.py index f2ddb65c3..6126eada2 100644 --- a/src/mdio/constants.py +++ b/src/mdio/constants.py @@ -64,4 +64,5 @@ class ZarrFormat(IntEnum): ScalarType.COMPLEX64: complex(np.nan, np.nan), ScalarType.COMPLEX128: complex(np.nan, np.nan), ScalarType.COMPLEX256: complex(np.nan, np.nan), + ScalarType.HEADERS_V3: b"\x00" * 240, } diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index 5d95ef6e3..0e2e121c7 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -22,6 +22,11 @@ from mdio.converters.exceptions import GridTraceSparsityError from mdio.converters.type_converter import to_structured_type from mdio.core.grid import Grid +from mdio.builder.schemas.chunk_grid import RegularChunkGrid +from mdio.builder.schemas.chunk_grid import RegularChunkShape +from mdio.builder.schemas.compressors import Blosc +from mdio.builder.schemas.compressors import BloscCname +from mdio.builder.schemas.dtype import ScalarType from mdio.segy import blocked_io from mdio.segy.utilities import get_grid_plan @@ -330,6 +335,58 @@ def _add_segy_ingest_attributes(dataset: Dataset, segy_file: SegyFile, grid_over dataset.metadata.attributes.update(segy_attributes) +def _add_raw_headers_to_template(mdio_template: AbstractDatasetTemplate) -> AbstractDatasetTemplate: + """Add raw headers capability to the MDIO template by monkey-patching its _add_variables method. + This function modifies the template's _add_variables method to also add a raw headers variable + with the following characteristics: + - Same rank as the Headers variable (all dimensions except vertical) + - Name: "RawHeaders" + - Type: ScalarType.HEADERS + - No coordinates + - zstd compressor + - No additional metadata + - Chunked the same as the Headers variable + Args: + mdio_template: The MDIO template to mutate + """ + # Check if raw headers enhancement has already been applied to avoid duplicate additions + if hasattr(mdio_template, '_mdio_raw_headers_enhanced'): + return mdio_template + + # Store the original _add_variables method + original_add_variables = mdio_template._add_variables + + def enhanced_add_variables() -> None: + # Call the original method first + original_add_variables() + + # Now add the raw headers variable + chunk_shape = mdio_template._var_chunk_shape[:-1] + + # Create chunk grid metadata + chunk_metadata = RegularChunkGrid(configuration=RegularChunkShape(chunk_shape=chunk_shape)) + from mdio.builder.schemas.v1.variable import VariableMetadata + + # Add the raw headers variable using the builder's add_variable method + mdio_template._builder.add_variable( + name="raw_headers", + long_name="Raw Headers", + dimensions=mdio_template._dim_names[:-1], # All dimensions except vertical + data_type=ScalarType.HEADERS_V3, + compressor=Blosc(cname=BloscCname.zstd), + coordinates=None, # No coordinates as specified + metadata=VariableMetadata(chunk_grid=chunk_metadata), + ) + + # Replace the template's _add_variables method + mdio_template._add_variables = enhanced_add_variables + + # Mark the template as enhanced to prevent duplicate monkey-patching + mdio_template._mdio_raw_headers_enhanced = True + + return mdio_template + + def segy_to_mdio( # noqa PLR0913 segy_spec: SegySpec, mdio_template: AbstractDatasetTemplate, @@ -369,6 +426,11 @@ def segy_to_mdio( # noqa PLR0913 _, non_dim_coords = _get_coordinates(grid, segy_headers, mdio_template) header_dtype = to_structured_type(segy_spec.trace.header.dtype) + + if os.getenv("MDIO__DO_RAW_HEADERS") == "1": + logger.warning("MDIO__DO_RAW_HEADERS is experimental and expected to change or be removed.") + mdio_template = _add_raw_headers_to_template(mdio_template) + horizontal_unit = _get_horizontal_coordinate_unit(segy_dimensions) mdio_ds: Dataset = mdio_template.build_dataset( name=mdio_template.name, diff --git a/src/mdio/converters/type_converter.py b/src/mdio/converters/type_converter.py index 427abdc7d..eec4e99f3 100644 --- a/src/mdio/converters/type_converter.py +++ b/src/mdio/converters/type_converter.py @@ -78,6 +78,8 @@ def to_structured_type(data_type: np_dtype) -> StructuredType: def to_numpy_dtype(data_type: ScalarType | StructuredType) -> np_dtype: """Get the numpy dtype for a variable.""" if isinstance(data_type, ScalarType): + if data_type == ScalarType.HEADERS_V3: + return np_dtype("|V240") return np_dtype(data_type.value) if isinstance(data_type, StructuredType): return np_dtype([(f.name, f.format.value) for f in data_type.fields]) diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 306821fe3..a57e1ccf7 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -122,12 +122,15 @@ def trace_worker( # noqa: PLR0913 traces = segy_file.trace[live_trace_indexes] header_key = "headers" + raw_header_key = "raw_headers" # Get subset of the dataset that has not yet been saved # The headers might not be present in the dataset worker_variables = [data_variable_name] if header_key in dataset.data_vars: # Keeping the `if` here to allow for more worker configurations worker_variables.append(header_key) + if raw_header_key in dataset.data_vars: + worker_variables.append(raw_header_key) ds_to_write = dataset[worker_variables] @@ -146,7 +149,15 @@ def trace_worker( # noqa: PLR0913 attrs=ds_to_write[header_key].attrs, encoding=ds_to_write[header_key].encoding, # Not strictly necessary, but safer than not doing it. ) - + if raw_header_key in worker_variables: + tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) + tmp_raw_headers[not_null] = traces.header.view("|V240") # TODO: Ensure this is using the RAW view and not an interpreted view. + ds_to_write[raw_header_key] = Variable( + ds_to_write[raw_header_key].dims, + tmp_raw_headers, + attrs=ds_to_write[raw_header_key].attrs, + encoding=ds_to_write[raw_header_key].encoding, # Not strictly necessary, but safer than not doing it. + ) data_variable = ds_to_write[data_variable_name] fill_value = _get_fill_value(ScalarType(data_variable.dtype.name)) tmp_samples = np.full_like(data_variable, fill_value=fill_value) From cdf9c6fa79653e1c7f1110e43d91b86c752ff4ae Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 10 Sep 2025 16:16:00 +0000 Subject: [PATCH 02/23] Ensure getting true raw bytes for DR array --- src/mdio/segy/_workers.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index a57e1ccf7..2ca7ef146 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -9,6 +9,7 @@ import numpy as np from segy import SegyFile +from segy.indexing import merge_cat_file from mdio.api.io import to_mdio from mdio.builder.schemas.dtype import ScalarType @@ -151,13 +152,39 @@ def trace_worker( # noqa: PLR0913 ) if raw_header_key in worker_variables: tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) - tmp_raw_headers[not_null] = traces.header.view("|V240") # TODO: Ensure this is using the RAW view and not an interpreted view. + + # Get the indices where we need to place results + live_mask = not_null + live_positions = np.where(live_mask.ravel())[0] + + if len(live_positions) > 0: + # Calculate byte ranges for headers + HEADER_SIZE = 240 + trace_offset = segy_file.spec.trace.offset + trace_itemsize = segy_file.spec.trace.itemsize + + starts = [] + ends = [] + for global_trace_idx in live_trace_indexes: + header_start = trace_offset + global_trace_idx * trace_itemsize + header_end = header_start + HEADER_SIZE + starts.append(header_start) + ends.append(header_end) + + # Capture raw bytes + raw_header_bytes = merge_cat_file(segy_file.fs, segy_file.url, starts, ends) + + # Convert and place results + raw_headers_array = np.frombuffer(bytes(raw_header_bytes), dtype="|V240") + tmp_raw_headers.ravel()[live_positions] = raw_headers_array + ds_to_write[raw_header_key] = Variable( ds_to_write[raw_header_key].dims, tmp_raw_headers, attrs=ds_to_write[raw_header_key].attrs, - encoding=ds_to_write[raw_header_key].encoding, # Not strictly necessary, but safer than not doing it. + encoding=ds_to_write[raw_header_key].encoding, ) + data_variable = ds_to_write[data_variable_name] fill_value = _get_fill_value(ScalarType(data_variable.dtype.name)) tmp_samples = np.full_like(data_variable, fill_value=fill_value) From fb910e73592e6fc8e0ccd89eb263fdaf04a3cdbf Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 10 Sep 2025 16:20:45 +0000 Subject: [PATCH 03/23] Linting --- src/mdio/converters/segy.py | 17 ++++++++++------- src/mdio/segy/_workers.py | 18 +++++++++--------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index 0e2e121c7..ea60afd31 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -14,19 +14,20 @@ from mdio.api.io import _normalize_path from mdio.api.io import to_mdio +from mdio.builder.schemas.chunk_grid import RegularChunkGrid +from mdio.builder.schemas.chunk_grid import RegularChunkShape +from mdio.builder.schemas.compressors import Blosc +from mdio.builder.schemas.compressors import BloscCname +from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.units import LengthUnitEnum from mdio.builder.schemas.v1.units import LengthUnitModel +from mdio.builder.schemas.v1.variable import VariableMetadata from mdio.builder.xarray_builder import to_xarray_dataset from mdio.converters.exceptions import EnvironmentFormatError from mdio.converters.exceptions import GridTraceCountError from mdio.converters.exceptions import GridTraceSparsityError from mdio.converters.type_converter import to_structured_type from mdio.core.grid import Grid -from mdio.builder.schemas.chunk_grid import RegularChunkGrid -from mdio.builder.schemas.chunk_grid import RegularChunkShape -from mdio.builder.schemas.compressors import Blosc -from mdio.builder.schemas.compressors import BloscCname -from mdio.builder.schemas.dtype import ScalarType from mdio.segy import blocked_io from mdio.segy.utilities import get_grid_plan @@ -337,6 +338,7 @@ def _add_segy_ingest_attributes(dataset: Dataset, segy_file: SegyFile, grid_over def _add_raw_headers_to_template(mdio_template: AbstractDatasetTemplate) -> AbstractDatasetTemplate: """Add raw headers capability to the MDIO template by monkey-patching its _add_variables method. + This function modifies the template's _add_variables method to also add a raw headers variable with the following characteristics: - Same rank as the Headers variable (all dimensions except vertical) @@ -348,9 +350,11 @@ def _add_raw_headers_to_template(mdio_template: AbstractDatasetTemplate) -> Abst - Chunked the same as the Headers variable Args: mdio_template: The MDIO template to mutate + Returns: + The mutated MDIO template """ # Check if raw headers enhancement has already been applied to avoid duplicate additions - if hasattr(mdio_template, '_mdio_raw_headers_enhanced'): + if hasattr(mdio_template, "_mdio_raw_headers_enhanced"): return mdio_template # Store the original _add_variables method @@ -365,7 +369,6 @@ def enhanced_add_variables() -> None: # Create chunk grid metadata chunk_metadata = RegularChunkGrid(configuration=RegularChunkShape(chunk_shape=chunk_shape)) - from mdio.builder.schemas.v1.variable import VariableMetadata # Add the raw headers variable using the builder's add_variable method mdio_template._builder.add_variable( diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 2ca7ef146..24362273e 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -152,39 +152,39 @@ def trace_worker( # noqa: PLR0913 ) if raw_header_key in worker_variables: tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) - + # Get the indices where we need to place results live_mask = not_null live_positions = np.where(live_mask.ravel())[0] - + if len(live_positions) > 0: # Calculate byte ranges for headers - HEADER_SIZE = 240 + header_size = 240 trace_offset = segy_file.spec.trace.offset trace_itemsize = segy_file.spec.trace.itemsize - + starts = [] ends = [] for global_trace_idx in live_trace_indexes: header_start = trace_offset + global_trace_idx * trace_itemsize - header_end = header_start + HEADER_SIZE + header_end = header_start + header_size starts.append(header_start) ends.append(header_end) - + # Capture raw bytes raw_header_bytes = merge_cat_file(segy_file.fs, segy_file.url, starts, ends) - + # Convert and place results raw_headers_array = np.frombuffer(bytes(raw_header_bytes), dtype="|V240") tmp_raw_headers.ravel()[live_positions] = raw_headers_array - + ds_to_write[raw_header_key] = Variable( ds_to_write[raw_header_key].dims, tmp_raw_headers, attrs=ds_to_write[raw_header_key].attrs, encoding=ds_to_write[raw_header_key].encoding, ) - + data_variable = ds_to_write[data_variable_name] fill_value = _get_fill_value(ScalarType(data_variable.dtype.name)) tmp_samples = np.full_like(data_variable, fill_value=fill_value) From d1a206c85fee12ad7b5af8cdf1461ee40e794031 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 10 Sep 2025 16:31:18 +0000 Subject: [PATCH 04/23] Add v2 issue check --- src/mdio/converters/segy.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index ea60afd31..c63345598 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -11,6 +11,7 @@ from segy.config import SegySettings from segy.standards.codes import MeasurementSystem as segy_MeasurementSystem from segy.standards.fields.trace import Rev0 as TraceHeaderFieldsRev0 +import zarr from mdio.api.io import _normalize_path from mdio.api.io import to_mdio @@ -23,6 +24,7 @@ from mdio.builder.schemas.v1.units import LengthUnitModel from mdio.builder.schemas.v1.variable import VariableMetadata from mdio.builder.xarray_builder import to_xarray_dataset +from mdio.constants import ZarrFormat from mdio.converters.exceptions import EnvironmentFormatError from mdio.converters.exceptions import GridTraceCountError from mdio.converters.exceptions import GridTraceSparsityError @@ -431,8 +433,11 @@ def segy_to_mdio( # noqa PLR0913 header_dtype = to_structured_type(segy_spec.trace.header.dtype) if os.getenv("MDIO__DO_RAW_HEADERS") == "1": - logger.warning("MDIO__DO_RAW_HEADERS is experimental and expected to change or be removed.") - mdio_template = _add_raw_headers_to_template(mdio_template) + if zarr.config.get("default_zarr_format") == ZarrFormat.V2: + logger.warning("Raw headers are only supported for Zarr v3. Skipping raw headers.") + else: + logger.warning("MDIO__DO_RAW_HEADERS is experimental and expected to change or be removed.") + mdio_template = _add_raw_headers_to_template(mdio_template) horizontal_unit = _get_horizontal_coordinate_unit(segy_dimensions) mdio_ds: Dataset = mdio_template.build_dataset( From b727a2bec2c292bc084a486f4753786cc5de6252 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 10 Sep 2025 18:48:51 +0000 Subject: [PATCH 05/23] Fix pre-commit --- src/mdio/converters/segy.py | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index c63345598..8e6afbec9 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -7,11 +7,11 @@ from typing import TYPE_CHECKING import numpy as np +import zarr from segy import SegyFile from segy.config import SegySettings from segy.standards.codes import MeasurementSystem as segy_MeasurementSystem from segy.standards.fields.trace import Rev0 as TraceHeaderFieldsRev0 -import zarr from mdio.api.io import _normalize_path from mdio.api.io import to_mdio diff --git a/uv.lock b/uv.lock index 6ee410644..f18ada602 100644 --- a/uv.lock +++ b/uv.lock @@ -1670,7 +1670,7 @@ wheels = [ [[package]] name = "multidimio" -version = "1.0.0a1" +version = "1.0.0" source = { editable = "." } dependencies = [ { name = "click" }, From 81662c78f67bc241f3e052eee7ceab8cafb96479 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 15:15:21 +0000 Subject: [PATCH 06/23] Working example --- src/mdio/segy/_disaster_recovery_wrapper.py | 227 ++++++++++++++++++++ src/mdio/segy/_workers.py | 37 +--- src/mdio/segy/blocked_io.py | 36 +--- 3 files changed, 243 insertions(+), 57 deletions(-) create mode 100644 src/mdio/segy/_disaster_recovery_wrapper.py diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py new file mode 100644 index 000000000..dfbad1f2a --- /dev/null +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -0,0 +1,227 @@ +"""Consumer-side utility to get both raw and transformed header data with single filesystem read.""" + +from __future__ import annotations + +import numpy as np +from typing import TYPE_CHECKING +from segy.transforms import ByteSwapTransform +from segy.transforms import IbmFloatTransform + +if TYPE_CHECKING: + from segy.file import SegyFile + from segy.indexing import HeaderIndexer + from segy.transforms import Transform, TransformPipeline, ByteSwapTransform, IbmFloatTransform + from numpy.typing import NDArray + + +def debug_compare_raw_vs_processed(segy_file, trace_index=0): + """Debug function to compare raw filesystem data vs processed data.""" + from segy.indexing import HeaderIndexer + + # Create a fresh indexer to get raw data + indexer = HeaderIndexer( + segy_file.fs, + segy_file.url, + segy_file.spec.trace, + segy_file.num_traces, + transform_pipeline=None # No transforms = raw data + ) + + # Get raw data directly from filesystem + raw_data = indexer[trace_index] + + # Get processed data with transforms + processed_data = segy_file.header[trace_index] + + print("=== Raw vs Processed Comparison ===") + print(f"Raw data shape: {raw_data.shape}") + print(f"Processed data shape: {processed_data.shape}") + + if hasattr(raw_data, 'dtype') and raw_data.dtype.names: + if 'inline_number' in raw_data.dtype.names: + print(f"Raw inline_number: {raw_data['inline_number']}") + print(f"Raw inline_number (hex): {raw_data['inline_number']:08x}") + print(f"Processed inline_number: {processed_data['inline_number']}") + print(f"Processed inline_number (hex): {processed_data['inline_number']:08x}") + print(f"Are they equal? {raw_data['inline_number'] == processed_data['inline_number']}") + + return raw_data, processed_data + + +class HeaderRawTransformedAccessor: + """Utility class to access both raw and transformed header data with single filesystem read. + + This class works as a consumer of SegyFile objects without modifying the package. + It achieves the goal by: + 1. Reading raw data from filesystem once + 2. Applying transforms to get transformed data + 3. Keeping both versions available + + The transforms used in SEG-Y processing are reversible: + - ByteSwapTransform: Self-inverse (swapping twice returns to original) + - IbmFloatTransform: Can be reversed by swapping direction + """ + + def __init__(self, segy_file: SegyFile): + """Initialize with a SegyFile instance. + + Args: + segy_file: The SegyFile instance to work with + """ + self.segy_file = segy_file + self.header_indexer = segy_file.header + self.transform_pipeline = self.header_indexer.transform_pipeline + + # Debug: Print transform pipeline information + import sys + print(f"Debug: System endianness: {sys.byteorder}") + print(f"Debug: File endianness: {self.segy_file.spec.endianness}") + print(f"Debug: Transform pipeline has {len(self.transform_pipeline.transforms)} transforms:") + for i, transform in enumerate(self.transform_pipeline.transforms): + print(f" Transform {i}: {type(transform).__name__}") + if hasattr(transform, 'target_order'): + print(f" Target order: {transform.target_order}") + if hasattr(transform, 'direction'): + print(f" Direction: {transform.direction}") + if hasattr(transform, 'keys'): + print(f" Keys: {transform.keys}") + + def get_raw_and_transformed( + self, indices: int | list[int] | np.ndarray | slice + ) -> tuple[NDArray, NDArray]: + """Get both raw and transformed header data with single filesystem read. + + Args: + indices: Which headers to retrieve (int, list, ndarray, or slice) + + Returns: + Tuple of (raw_headers, transformed_headers) + """ + # Get the transformed data using the normal API + # This reads from filesystem and applies transforms + transformed_data = self.header_indexer[indices] + + print(f"Debug: Transformed data shape: {transformed_data.shape}") + if hasattr(transformed_data, 'dtype') and transformed_data.dtype.names: + print(f"Debug: Transformed data dtype names: {transformed_data.dtype.names[:5]}...") # First 5 fields + if 'inline_number' in transformed_data.dtype.names: + print(f"Debug: First transformed inline_number: {transformed_data['inline_number'][0]}") + print(f"Debug: First transformed inline_number (hex): {transformed_data['inline_number'][0]:08x}") + + # Now reverse the transforms to get back to raw data + raw_data = self._reverse_transforms(transformed_data) + + print(f"Debug: Raw data shape: {raw_data.shape}") + if hasattr(raw_data, 'dtype') and raw_data.dtype.names: + if 'inline_number' in raw_data.dtype.names: + print(f"Debug: First raw inline_number: {raw_data['inline_number'][0]}") + print(f"Debug: First raw inline_number (hex): {raw_data['inline_number'][0]:08x}") + + return raw_data, transformed_data + + def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: + """Reverse the transform pipeline to get raw data from transformed data. + + Args: + transformed_data: Data that has been processed through the transform pipeline + + Returns: + Raw data equivalent to what was read directly from filesystem + """ + # Start with the transformed data + raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data + + print(f"Debug: Starting reversal with {len(self.transform_pipeline.transforms)} transforms") + + # Apply transforms in reverse order with reversed operations + for i, transform in enumerate(reversed(self.transform_pipeline.transforms)): + print(f"Debug: Reversing transform {len(self.transform_pipeline.transforms)-1-i}: {type(transform).__name__}") + if 'inline_number' in raw_data.dtype.names: + print(f"Debug: Before reversal - inline_number: {raw_data['inline_number'][0]:08x}") + raw_data = self._reverse_single_transform(raw_data, transform) + if 'inline_number' in raw_data.dtype.names: + print(f"Debug: After reversal - inline_number: {raw_data['inline_number'][0]:08x}") + + return raw_data + + def _reverse_single_transform(self, data: NDArray, transform: Transform) -> NDArray: + """Reverse a single transform operation. + + Args: + data: The data to reverse transform + transform: The transform to reverse + + Returns: + Data with the transform reversed + """ + # Import here to avoid circular imports + from segy.transforms import get_endianness + from segy.schema import Endianness + + if isinstance(transform, ByteSwapTransform): + # For byte swap, we need to reverse the endianness conversion + # If the transform was converting to little-endian, we need to convert back to big-endian + print(f"Debug: Reversing byte swap (target was: {transform.target_order})") + + # Get current data endianness + current_endianness = get_endianness(data) + print(f"Debug: Current data endianness: {current_endianness}") + + # If transform was converting TO little-endian, we need to convert TO big-endian + if transform.target_order == Endianness.LITTLE: + reverse_target = Endianness.BIG + else: + reverse_target = Endianness.LITTLE + + print(f"Debug: Reversing to target: {reverse_target}") + reverse_transform = ByteSwapTransform(reverse_target) + result = reverse_transform.apply(data) + + if 'inline_number' in data.dtype.names: + print(f"Debug: Byte swap reversal - before: {data['inline_number'][0]:08x}, after: {result['inline_number'][0]:08x}") + return result + + elif isinstance(transform, IbmFloatTransform): + # Reverse IBM float conversion by swapping direction + reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" + print(f"Debug: Applying IBM float reversal (direction: {transform.direction} -> {reverse_direction})") + reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) + return reverse_transform.apply(data) + + else: + # For unknown transforms, return data unchanged + # This maintains compatibility if new transforms are added + print(f"Warning: Unknown transform type {type(transform).__name__}, cannot reverse") + return data + + +def get_header_raw_and_transformed( + segy_file: SegyFile, + indices: int | list[int] | np.ndarray | slice +) -> tuple[NDArray, NDArray]: + """Convenience function to get both raw and transformed header data. + + This is a drop-in replacement that provides the functionality you requested + without modifying the segy package. + + Args: + segy_file: The SegyFile instance + indices: Which headers to retrieve + + Returns: + Tuple of (raw_headers, transformed_headers) + + Example: + from header_raw_transformed_accessor import get_header_raw_and_transformed + + # Single header + raw_hdr, transformed_hdr = get_header_raw_and_transformed(segy_file, 0) + + # Multiple headers + raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, [0, 1, 2]) + + # Slice of headers + raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, slice(0, 10)) + """ + accessor = HeaderRawTransformedAccessor(segy_file) + return accessor.get_raw_and_transformed(indices) diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 24362273e..67a039a78 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -13,6 +13,7 @@ from mdio.api.io import to_mdio from mdio.builder.schemas.dtype import ScalarType +from mdio.segy._disaster_recovery_wrapper import get_header_raw_and_transformed if TYPE_CHECKING: from segy.arrays import HeaderArray @@ -81,7 +82,7 @@ def header_scan_worker( return cast("HeaderArray", trace_header) - +@profile def trace_worker( # noqa: PLR0913 segy_kw: SegyFileArguments, output_path: UPath, @@ -134,12 +135,13 @@ def trace_worker( # noqa: PLR0913 worker_variables.append(raw_header_key) ds_to_write = dataset[worker_variables] + raw_headers, transformed_headers = get_header_raw_and_transformed(segy_file, live_trace_indexes) if header_key in worker_variables: # Create temporary array for headers with the correct shape # TODO(BrianMichell): Implement this better so that we can enable fill values without changing the code. #noqa: TD003 tmp_headers = np.zeros_like(dataset[header_key]) - tmp_headers[not_null] = traces.header + tmp_headers[not_null] = transformed_headers # Create a new Variable object to avoid copying the temporary array # The ideal solution is to use `ds_to_write[header_key][:] = tmp_headers` # but Xarray appears to be copying memory instead of doing direct assignment. @@ -150,40 +152,17 @@ def trace_worker( # noqa: PLR0913 attrs=ds_to_write[header_key].attrs, encoding=ds_to_write[header_key].encoding, # Not strictly necessary, but safer than not doing it. ) + del transformed_headers # Manage memory if raw_header_key in worker_variables: tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) - - # Get the indices where we need to place results - live_mask = not_null - live_positions = np.where(live_mask.ravel())[0] - - if len(live_positions) > 0: - # Calculate byte ranges for headers - header_size = 240 - trace_offset = segy_file.spec.trace.offset - trace_itemsize = segy_file.spec.trace.itemsize - - starts = [] - ends = [] - for global_trace_idx in live_trace_indexes: - header_start = trace_offset + global_trace_idx * trace_itemsize - header_end = header_start + header_size - starts.append(header_start) - ends.append(header_end) - - # Capture raw bytes - raw_header_bytes = merge_cat_file(segy_file.fs, segy_file.url, starts, ends) - - # Convert and place results - raw_headers_array = np.frombuffer(bytes(raw_header_bytes), dtype="|V240") - tmp_raw_headers.ravel()[live_positions] = raw_headers_array - + tmp_raw_headers[not_null] = raw_headers.view("|V240") ds_to_write[raw_header_key] = Variable( ds_to_write[raw_header_key].dims, tmp_raw_headers, attrs=ds_to_write[raw_header_key].attrs, - encoding=ds_to_write[raw_header_key].encoding, + encoding=ds_to_write[raw_header_key].encoding, # Not strictly necessary, but safer than not doing it. ) + del raw_headers # Manage memory data_variable = ds_to_write[data_variable_name] fill_value = _get_fill_value(ScalarType(data_variable.dtype.name)) diff --git a/src/mdio/segy/blocked_io.py b/src/mdio/segy/blocked_io.py index 27141474c..a4efed89b 100644 --- a/src/mdio/segy/blocked_io.py +++ b/src/mdio/segy/blocked_io.py @@ -2,10 +2,7 @@ from __future__ import annotations -import multiprocessing as mp import os -from concurrent.futures import ProcessPoolExecutor -from concurrent.futures import as_completed from pathlib import Path from typing import TYPE_CHECKING @@ -80,37 +77,20 @@ def to_zarr( # noqa: PLR0913, PLR0915 chunk_iter = ChunkIterator(shape=data.shape, chunks=worker_chunks, dim_names=data.dims) num_chunks = chunk_iter.num_chunks - # For Unix async writes with s3fs/fsspec & multiprocessing, use 'spawn' instead of default - # 'fork' to avoid deadlocks on cloud stores. Slower but necessary. Default on Windows. - num_cpus = int(os.getenv("MDIO__IMPORT__CPU_COUNT", default_cpus)) - num_workers = min(num_chunks, num_cpus) - context = mp.get_context("spawn") - executor = ProcessPoolExecutor(max_workers=num_workers, mp_context=context) - segy_kw = { "url": segy_file.fs.unstrip_protocol(segy_file.url), "spec": segy_file.spec, "settings": segy_file.settings, } - with executor: - futures = [] - common_args = (segy_kw, output_path, data_variable_name) - for region in chunk_iter: - subset_args = (region, grid_map, dataset.isel(region)) - future = executor.submit(trace_worker, *common_args, *subset_args) - futures.append(future) - - iterable = tqdm( - as_completed(futures), - total=num_chunks, - unit="block", - desc="Ingesting traces", - ) - for future in iterable: - result = future.result() - if result is not None: - _update_stats(final_stats, result) + common_args = (segy_kw, output_path, data_variable_name) + + # Execute trace_worker serially for profiling + for region in tqdm(chunk_iter, total=num_chunks, unit="block", desc="Ingesting traces"): + subset_args = (region, grid_map, dataset.isel(region)) + result = trace_worker(*common_args, *subset_args) + if result is not None: + _update_stats(final_stats, result) # Xarray doesn't directly support incremental attribute updates when appending to an existing Zarr store. # HACK: We will update the array attribute using zarr's API directly. From 76ec7ced08a751d2b8b7402dbeda02bcd3d065d4 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 15:16:54 +0000 Subject: [PATCH 07/23] Cleanup prints --- src/mdio/segy/_disaster_recovery_wrapper.py | 52 --------------------- 1 file changed, 52 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index dfbad1f2a..02f964993 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -33,18 +33,6 @@ def debug_compare_raw_vs_processed(segy_file, trace_index=0): # Get processed data with transforms processed_data = segy_file.header[trace_index] - print("=== Raw vs Processed Comparison ===") - print(f"Raw data shape: {raw_data.shape}") - print(f"Processed data shape: {processed_data.shape}") - - if hasattr(raw_data, 'dtype') and raw_data.dtype.names: - if 'inline_number' in raw_data.dtype.names: - print(f"Raw inline_number: {raw_data['inline_number']}") - print(f"Raw inline_number (hex): {raw_data['inline_number']:08x}") - print(f"Processed inline_number: {processed_data['inline_number']}") - print(f"Processed inline_number (hex): {processed_data['inline_number']:08x}") - print(f"Are they equal? {raw_data['inline_number'] == processed_data['inline_number']}") - return raw_data, processed_data @@ -72,20 +60,6 @@ def __init__(self, segy_file: SegyFile): self.header_indexer = segy_file.header self.transform_pipeline = self.header_indexer.transform_pipeline - # Debug: Print transform pipeline information - import sys - print(f"Debug: System endianness: {sys.byteorder}") - print(f"Debug: File endianness: {self.segy_file.spec.endianness}") - print(f"Debug: Transform pipeline has {len(self.transform_pipeline.transforms)} transforms:") - for i, transform in enumerate(self.transform_pipeline.transforms): - print(f" Transform {i}: {type(transform).__name__}") - if hasattr(transform, 'target_order'): - print(f" Target order: {transform.target_order}") - if hasattr(transform, 'direction'): - print(f" Direction: {transform.direction}") - if hasattr(transform, 'keys'): - print(f" Keys: {transform.keys}") - def get_raw_and_transformed( self, indices: int | list[int] | np.ndarray | slice ) -> tuple[NDArray, NDArray]: @@ -101,22 +75,9 @@ def get_raw_and_transformed( # This reads from filesystem and applies transforms transformed_data = self.header_indexer[indices] - print(f"Debug: Transformed data shape: {transformed_data.shape}") - if hasattr(transformed_data, 'dtype') and transformed_data.dtype.names: - print(f"Debug: Transformed data dtype names: {transformed_data.dtype.names[:5]}...") # First 5 fields - if 'inline_number' in transformed_data.dtype.names: - print(f"Debug: First transformed inline_number: {transformed_data['inline_number'][0]}") - print(f"Debug: First transformed inline_number (hex): {transformed_data['inline_number'][0]:08x}") - # Now reverse the transforms to get back to raw data raw_data = self._reverse_transforms(transformed_data) - print(f"Debug: Raw data shape: {raw_data.shape}") - if hasattr(raw_data, 'dtype') and raw_data.dtype.names: - if 'inline_number' in raw_data.dtype.names: - print(f"Debug: First raw inline_number: {raw_data['inline_number'][0]}") - print(f"Debug: First raw inline_number (hex): {raw_data['inline_number'][0]:08x}") - return raw_data, transformed_data def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: @@ -131,16 +92,10 @@ def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: # Start with the transformed data raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data - print(f"Debug: Starting reversal with {len(self.transform_pipeline.transforms)} transforms") # Apply transforms in reverse order with reversed operations for i, transform in enumerate(reversed(self.transform_pipeline.transforms)): - print(f"Debug: Reversing transform {len(self.transform_pipeline.transforms)-1-i}: {type(transform).__name__}") - if 'inline_number' in raw_data.dtype.names: - print(f"Debug: Before reversal - inline_number: {raw_data['inline_number'][0]:08x}") raw_data = self._reverse_single_transform(raw_data, transform) - if 'inline_number' in raw_data.dtype.names: - print(f"Debug: After reversal - inline_number: {raw_data['inline_number'][0]:08x}") return raw_data @@ -161,11 +116,9 @@ def _reverse_single_transform(self, data: NDArray, transform: Transform) -> NDAr if isinstance(transform, ByteSwapTransform): # For byte swap, we need to reverse the endianness conversion # If the transform was converting to little-endian, we need to convert back to big-endian - print(f"Debug: Reversing byte swap (target was: {transform.target_order})") # Get current data endianness current_endianness = get_endianness(data) - print(f"Debug: Current data endianness: {current_endianness}") # If transform was converting TO little-endian, we need to convert TO big-endian if transform.target_order == Endianness.LITTLE: @@ -173,25 +126,20 @@ def _reverse_single_transform(self, data: NDArray, transform: Transform) -> NDAr else: reverse_target = Endianness.LITTLE - print(f"Debug: Reversing to target: {reverse_target}") reverse_transform = ByteSwapTransform(reverse_target) result = reverse_transform.apply(data) - if 'inline_number' in data.dtype.names: - print(f"Debug: Byte swap reversal - before: {data['inline_number'][0]:08x}, after: {result['inline_number'][0]:08x}") return result elif isinstance(transform, IbmFloatTransform): # Reverse IBM float conversion by swapping direction reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" - print(f"Debug: Applying IBM float reversal (direction: {transform.direction} -> {reverse_direction})") reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) return reverse_transform.apply(data) else: # For unknown transforms, return data unchanged # This maintains compatibility if new transforms are added - print(f"Warning: Unknown transform type {type(transform).__name__}, cannot reverse") return data From 2e18d97310b73473c5d0dde515ab636ed4cd19dd Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 18:36:43 +0000 Subject: [PATCH 08/23] churn --- src/mdio/segy/_disaster_recovery_wrapper.py | 170 ++++++++++---------- src/mdio/segy/_workers.py | 5 +- 2 files changed, 90 insertions(+), 85 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 02f964993..1d0f789aa 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -14,28 +14,6 @@ from numpy.typing import NDArray -def debug_compare_raw_vs_processed(segy_file, trace_index=0): - """Debug function to compare raw filesystem data vs processed data.""" - from segy.indexing import HeaderIndexer - - # Create a fresh indexer to get raw data - indexer = HeaderIndexer( - segy_file.fs, - segy_file.url, - segy_file.spec.trace, - segy_file.num_traces, - transform_pipeline=None # No transforms = raw data - ) - - # Get raw data directly from filesystem - raw_data = indexer[trace_index] - - # Get processed data with transforms - processed_data = segy_file.header[trace_index] - - return raw_data, processed_data - - class HeaderRawTransformedAccessor: """Utility class to access both raw and transformed header data with single filesystem read. @@ -57,28 +35,7 @@ def __init__(self, segy_file: SegyFile): segy_file: The SegyFile instance to work with """ self.segy_file = segy_file - self.header_indexer = segy_file.header - self.transform_pipeline = self.header_indexer.transform_pipeline - - def get_raw_and_transformed( - self, indices: int | list[int] | np.ndarray | slice - ) -> tuple[NDArray, NDArray]: - """Get both raw and transformed header data with single filesystem read. - - Args: - indices: Which headers to retrieve (int, list, ndarray, or slice) - - Returns: - Tuple of (raw_headers, transformed_headers) - """ - # Get the transformed data using the normal API - # This reads from filesystem and applies transforms - transformed_data = self.header_indexer[indices] - - # Now reverse the transforms to get back to raw data - raw_data = self._reverse_transforms(transformed_data) - - return raw_data, transformed_data + self.transform_pipeline = self.segy_file.header.transform_pipeline def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: """Reverse the transform pipeline to get raw data from transformed data. @@ -95,52 +52,51 @@ def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: # Apply transforms in reverse order with reversed operations for i, transform in enumerate(reversed(self.transform_pipeline.transforms)): - raw_data = self._reverse_single_transform(raw_data, transform) + raw_data = _reverse_single_transform(raw_data, transform) return raw_data - def _reverse_single_transform(self, data: NDArray, transform: Transform) -> NDArray: - """Reverse a single transform operation. - - Args: - data: The data to reverse transform - transform: The transform to reverse - - Returns: - Data with the transform reversed - """ - # Import here to avoid circular imports - from segy.transforms import get_endianness - from segy.schema import Endianness - - if isinstance(transform, ByteSwapTransform): - # For byte swap, we need to reverse the endianness conversion - # If the transform was converting to little-endian, we need to convert back to big-endian +@profile +def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: + """Reverse a single transform operation. - # Get current data endianness - current_endianness = get_endianness(data) + Args: + data: The data to reverse transform + transform: The transform to reverse - # If transform was converting TO little-endian, we need to convert TO big-endian - if transform.target_order == Endianness.LITTLE: - reverse_target = Endianness.BIG - else: - reverse_target = Endianness.LITTLE + Returns: + Data with the transform reversed + """ + # Import here to avoid circular imports + from segy.transforms import get_endianness + from segy.schema import Endianness + + if isinstance(transform, ByteSwapTransform): + # For byte swap, we need to reverse the endianness conversion + # If the transform was converting to little-endian, we need to convert back to big-endian + + # If transform was converting TO little-endian, we need to convert TO big-endian + # TODO: I don't think this is correct + if transform.target_order == Endianness.LITTLE: + reverse_target = Endianness.BIG + else: + reverse_target = Endianness.LITTLE - reverse_transform = ByteSwapTransform(reverse_target) - result = reverse_transform.apply(data) + reverse_transform = ByteSwapTransform(reverse_target) + result = reverse_transform.apply(data) - return result + return result - elif isinstance(transform, IbmFloatTransform): - # Reverse IBM float conversion by swapping direction - reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" - reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) - return reverse_transform.apply(data) + elif isinstance(transform, IbmFloatTransform): + # Reverse IBM float conversion by swapping direction + reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" + reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) + return reverse_transform.apply(data) - else: - # For unknown transforms, return data unchanged - # This maintains compatibility if new transforms are added - return data + else: + # For unknown transforms, return data unchanged + # This maintains compatibility if new transforms are added + return data def get_header_raw_and_transformed( @@ -171,5 +127,53 @@ def get_header_raw_and_transformed( # Slice of headers raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, slice(0, 10)) """ - accessor = HeaderRawTransformedAccessor(segy_file) - return accessor.get_raw_and_transformed(indices) + return _get_header_raw_optimized(segy_file, indices) + +@profile +def _get_header_raw_optimized( + segy_file: SegyFile, + indices: int | list[int] | np.ndarray | slice +) -> tuple[NDArray, NDArray]: + """Ultra-optimized function that eliminates double disk reads entirely. + + This function: + 1. Gets transformed headers using the normal API (single disk read) + 2. Reverses the transforms on the already-loaded data (no second disk read) + 3. Returns both raw and transformed headers + + Args: + segy_file: The SegyFile instance + indices: Which headers to retrieve + + Returns: + Tuple of (raw_headers, transformed_headers) where transformed_headers + is the same as what segy_file.header[indices] would return + """ + # Get transformed headers using the normal API (single disk read) + transformed_headers = segy_file.header[indices] + + # Reverse the transforms on the already-loaded transformed data + # This eliminates the second disk read entirely! + raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) + + return raw_headers, transformed_headers + +@profile +def _reverse_transforms(transformed_data: NDArray, transform_pipeline) -> NDArray: + """Reverse the transform pipeline to get raw data from transformed data. + + Args: + transformed_data: Data that has been processed through the transform pipeline + transform_pipeline: The transform pipeline to reverse + + Returns: + Raw data equivalent to what was read directly from filesystem + """ + # Start with the transformed data + raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data + + # Apply transforms in reverse order with reversed operations + for transform in reversed(transform_pipeline.transforms): + raw_data = _reverse_single_transform(raw_data, transform) + + return raw_data diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 67a039a78..570897967 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -82,7 +82,7 @@ def header_scan_worker( return cast("HeaderArray", trace_header) -@profile +# @profile def trace_worker( # noqa: PLR0913 segy_kw: SegyFileArguments, output_path: UPath, @@ -142,6 +142,7 @@ def trace_worker( # noqa: PLR0913 # TODO(BrianMichell): Implement this better so that we can enable fill values without changing the code. #noqa: TD003 tmp_headers = np.zeros_like(dataset[header_key]) tmp_headers[not_null] = transformed_headers + # tmp_headers[not_null] = traces.header # Create a new Variable object to avoid copying the temporary array # The ideal solution is to use `ds_to_write[header_key][:] = tmp_headers` # but Xarray appears to be copying memory instead of doing direct assignment. @@ -152,7 +153,7 @@ def trace_worker( # noqa: PLR0913 attrs=ds_to_write[header_key].attrs, encoding=ds_to_write[header_key].encoding, # Not strictly necessary, but safer than not doing it. ) - del transformed_headers # Manage memory + # del transformed_headers # Manage memory if raw_header_key in worker_variables: tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) tmp_raw_headers[not_null] = raw_headers.view("|V240") From 42f0ab66a290b806b00d941dcd12d47a39c8b5a0 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 19:12:06 +0000 Subject: [PATCH 09/23] Runtime regression fix --- src/mdio/segy/_disaster_recovery_wrapper.py | 74 ++------------------- src/mdio/segy/_workers.py | 9 +-- src/mdio/segy/blocked_io.py | 38 ++++++++--- 3 files changed, 38 insertions(+), 83 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 1d0f789aa..e2372c883 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -13,50 +13,6 @@ from segy.transforms import Transform, TransformPipeline, ByteSwapTransform, IbmFloatTransform from numpy.typing import NDArray - -class HeaderRawTransformedAccessor: - """Utility class to access both raw and transformed header data with single filesystem read. - - This class works as a consumer of SegyFile objects without modifying the package. - It achieves the goal by: - 1. Reading raw data from filesystem once - 2. Applying transforms to get transformed data - 3. Keeping both versions available - - The transforms used in SEG-Y processing are reversible: - - ByteSwapTransform: Self-inverse (swapping twice returns to original) - - IbmFloatTransform: Can be reversed by swapping direction - """ - - def __init__(self, segy_file: SegyFile): - """Initialize with a SegyFile instance. - - Args: - segy_file: The SegyFile instance to work with - """ - self.segy_file = segy_file - self.transform_pipeline = self.segy_file.header.transform_pipeline - - def _reverse_transforms(self, transformed_data: NDArray) -> NDArray: - """Reverse the transform pipeline to get raw data from transformed data. - - Args: - transformed_data: Data that has been processed through the transform pipeline - - Returns: - Raw data equivalent to what was read directly from filesystem - """ - # Start with the transformed data - raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data - - - # Apply transforms in reverse order with reversed operations - for i, transform in enumerate(reversed(self.transform_pipeline.transforms)): - raw_data = _reverse_single_transform(raw_data, transform) - - return raw_data - -@profile def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: """Reverse a single transform operation. @@ -98,11 +54,10 @@ def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: # This maintains compatibility if new transforms are added return data - def get_header_raw_and_transformed( segy_file: SegyFile, indices: int | list[int] | np.ndarray | slice -) -> tuple[NDArray, NDArray]: +) -> tuple[NDArray, NDArray, NDArray]: """Convenience function to get both raw and transformed header data. This is a drop-in replacement that provides the functionality you requested @@ -127,38 +82,17 @@ def get_header_raw_and_transformed( # Slice of headers raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, slice(0, 10)) """ - return _get_header_raw_optimized(segy_file, indices) - -@profile -def _get_header_raw_optimized( - segy_file: SegyFile, - indices: int | list[int] | np.ndarray | slice -) -> tuple[NDArray, NDArray]: - """Ultra-optimized function that eliminates double disk reads entirely. - This function: - 1. Gets transformed headers using the normal API (single disk read) - 2. Reverses the transforms on the already-loaded data (no second disk read) - 3. Returns both raw and transformed headers + traces = segy_file.trace[indices] - Args: - segy_file: The SegyFile instance - indices: Which headers to retrieve - - Returns: - Tuple of (raw_headers, transformed_headers) where transformed_headers - is the same as what segy_file.header[indices] would return - """ - # Get transformed headers using the normal API (single disk read) - transformed_headers = segy_file.header[indices] + transformed_headers = traces.header # Reverse the transforms on the already-loaded transformed data # This eliminates the second disk read entirely! raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) - return raw_headers, transformed_headers + return raw_headers, transformed_headers, traces -@profile def _reverse_transforms(transformed_data: NDArray, transform_pipeline) -> NDArray: """Reverse the transform pipeline to get raw data from transformed data. diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 570897967..65f6712ec 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -121,7 +121,8 @@ def trace_worker( # noqa: PLR0913 zarr_config.set({"threading.max_workers": 1}) live_trace_indexes = local_grid_map[not_null].tolist() - traces = segy_file.trace[live_trace_indexes] + # traces = segy_file.trace[live_trace_indexes] + raw_headers, transformed_headers, traces = get_header_raw_and_transformed(segy_file, live_trace_indexes) header_key = "headers" raw_header_key = "raw_headers" @@ -135,7 +136,7 @@ def trace_worker( # noqa: PLR0913 worker_variables.append(raw_header_key) ds_to_write = dataset[worker_variables] - raw_headers, transformed_headers = get_header_raw_and_transformed(segy_file, live_trace_indexes) + # raw_headers, transformed_headers = get_header_raw_and_transformed(segy_file, live_trace_indexes) if header_key in worker_variables: # Create temporary array for headers with the correct shape @@ -153,7 +154,7 @@ def trace_worker( # noqa: PLR0913 attrs=ds_to_write[header_key].attrs, encoding=ds_to_write[header_key].encoding, # Not strictly necessary, but safer than not doing it. ) - # del transformed_headers # Manage memory + del transformed_headers # Manage memory if raw_header_key in worker_variables: tmp_raw_headers = np.zeros_like(dataset[raw_header_key]) tmp_raw_headers[not_null] = raw_headers.view("|V240") @@ -163,8 +164,8 @@ def trace_worker( # noqa: PLR0913 attrs=ds_to_write[raw_header_key].attrs, encoding=ds_to_write[raw_header_key].encoding, # Not strictly necessary, but safer than not doing it. ) - del raw_headers # Manage memory + del raw_headers # Manage memory data_variable = ds_to_write[data_variable_name] fill_value = _get_fill_value(ScalarType(data_variable.dtype.name)) tmp_samples = np.full_like(data_variable, fill_value=fill_value) diff --git a/src/mdio/segy/blocked_io.py b/src/mdio/segy/blocked_io.py index a4efed89b..09e9356d6 100644 --- a/src/mdio/segy/blocked_io.py +++ b/src/mdio/segy/blocked_io.py @@ -2,7 +2,10 @@ from __future__ import annotations +import multiprocessing as mp import os +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import as_completed from pathlib import Path from typing import TYPE_CHECKING @@ -77,20 +80,37 @@ def to_zarr( # noqa: PLR0913, PLR0915 chunk_iter = ChunkIterator(shape=data.shape, chunks=worker_chunks, dim_names=data.dims) num_chunks = chunk_iter.num_chunks + # For Unix async writes with s3fs/fsspec & multiprocessing, use 'spawn' instead of default + # 'fork' to avoid deadlocks on cloud stores. Slower but necessary. Default on Windows. + num_cpus = int(os.getenv("MDIO__IMPORT__CPU_COUNT", default_cpus)) + num_workers = min(num_chunks, num_cpus) + context = mp.get_context("spawn") + executor = ProcessPoolExecutor(max_workers=num_workers, mp_context=context) + segy_kw = { "url": segy_file.fs.unstrip_protocol(segy_file.url), "spec": segy_file.spec, "settings": segy_file.settings, } + with executor: + futures = [] + common_args = (segy_kw, output_path, data_variable_name) + for region in chunk_iter: + subset_args = (region, grid_map, dataset.isel(region)) + future = executor.submit(trace_worker, *common_args, *subset_args) + futures.append(future) + + iterable = tqdm( + as_completed(futures), + total=num_chunks, + unit="block", + desc="Ingesting traces", + ) - common_args = (segy_kw, output_path, data_variable_name) - - # Execute trace_worker serially for profiling - for region in tqdm(chunk_iter, total=num_chunks, unit="block", desc="Ingesting traces"): - subset_args = (region, grid_map, dataset.isel(region)) - result = trace_worker(*common_args, *subset_args) - if result is not None: - _update_stats(final_stats, result) + for future in iterable: + result = future.result() + if result is not None: + _update_stats(final_stats, result) # Xarray doesn't directly support incremental attribute updates when appending to an existing Zarr store. # HACK: We will update the array attribute using zarr's API directly. @@ -260,4 +280,4 @@ def to_segy( non_consecutive_axes -= 1 - return block_io_records + return block_io_records \ No newline at end of file From dddab964c825f290696d2d0eae15af95b14e31e9 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 19:28:50 +0000 Subject: [PATCH 10/23] Cleanup --- src/mdio/segy/_disaster_recovery_wrapper.py | 69 ++++----------------- src/mdio/segy/_workers.py | 4 -- 2 files changed, 12 insertions(+), 61 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index e2372c883..7765822d7 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -2,36 +2,20 @@ from __future__ import annotations -import numpy as np from typing import TYPE_CHECKING -from segy.transforms import ByteSwapTransform -from segy.transforms import IbmFloatTransform if TYPE_CHECKING: + import numpy as np from segy.file import SegyFile - from segy.indexing import HeaderIndexer - from segy.transforms import Transform, TransformPipeline, ByteSwapTransform, IbmFloatTransform + from segy.transforms import Transform, ByteSwapTransform, IbmFloatTransform from numpy.typing import NDArray def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: - """Reverse a single transform operation. - - Args: - data: The data to reverse transform - transform: The transform to reverse - - Returns: - Data with the transform reversed - """ - # Import here to avoid circular imports - from segy.transforms import get_endianness + """Reverse a single transform operation.""" from segy.schema import Endianness if isinstance(transform, ByteSwapTransform): - # For byte swap, we need to reverse the endianness conversion - # If the transform was converting to little-endian, we need to convert back to big-endian - - # If transform was converting TO little-endian, we need to convert TO big-endian + # Reverse the endianness conversion # TODO: I don't think this is correct if transform.target_order == Endianness.LITTLE: reverse_target = Endianness.BIG @@ -39,74 +23,45 @@ def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: reverse_target = Endianness.LITTLE reverse_transform = ByteSwapTransform(reverse_target) - result = reverse_transform.apply(data) - - return result + return reverse_transform.apply(data) elif isinstance(transform, IbmFloatTransform): - # Reverse IBM float conversion by swapping direction + # Reverse IBM float conversion reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) return reverse_transform.apply(data) else: # For unknown transforms, return data unchanged - # This maintains compatibility if new transforms are added return data def get_header_raw_and_transformed( segy_file: SegyFile, - indices: int | list[int] | np.ndarray | slice + indices: int | list[int] | NDArray | slice ) -> tuple[NDArray, NDArray, NDArray]: - """Convenience function to get both raw and transformed header data. - - This is a drop-in replacement that provides the functionality you requested - without modifying the segy package. + """Get both raw and transformed header data. Args: segy_file: The SegyFile instance indices: Which headers to retrieve Returns: - Tuple of (raw_headers, transformed_headers) - - Example: - from header_raw_transformed_accessor import get_header_raw_and_transformed - - # Single header - raw_hdr, transformed_hdr = get_header_raw_and_transformed(segy_file, 0) - - # Multiple headers - raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, [0, 1, 2]) - - # Slice of headers - raw_hdrs, transformed_hdrs = get_header_raw_and_transformed(segy_file, slice(0, 10)) + Tuple of (raw_headers, transformed_headers, traces) """ traces = segy_file.trace[indices] - transformed_headers = traces.header - # Reverse the transforms on the already-loaded transformed data - # This eliminates the second disk read entirely! + # Reverse transforms to get raw data raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) return raw_headers, transformed_headers, traces def _reverse_transforms(transformed_data: NDArray, transform_pipeline) -> NDArray: - """Reverse the transform pipeline to get raw data from transformed data. - - Args: - transformed_data: Data that has been processed through the transform pipeline - transform_pipeline: The transform pipeline to reverse - - Returns: - Raw data equivalent to what was read directly from filesystem - """ - # Start with the transformed data + """Reverse the transform pipeline to get raw data.""" raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data - # Apply transforms in reverse order with reversed operations + # Apply transforms in reverse order for transform in reversed(transform_pipeline.transforms): raw_data = _reverse_single_transform(raw_data, transform) diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 65f6712ec..5dd595ab7 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -121,7 +121,6 @@ def trace_worker( # noqa: PLR0913 zarr_config.set({"threading.max_workers": 1}) live_trace_indexes = local_grid_map[not_null].tolist() - # traces = segy_file.trace[live_trace_indexes] raw_headers, transformed_headers, traces = get_header_raw_and_transformed(segy_file, live_trace_indexes) header_key = "headers" @@ -136,14 +135,11 @@ def trace_worker( # noqa: PLR0913 worker_variables.append(raw_header_key) ds_to_write = dataset[worker_variables] - # raw_headers, transformed_headers = get_header_raw_and_transformed(segy_file, live_trace_indexes) if header_key in worker_variables: # Create temporary array for headers with the correct shape - # TODO(BrianMichell): Implement this better so that we can enable fill values without changing the code. #noqa: TD003 tmp_headers = np.zeros_like(dataset[header_key]) tmp_headers[not_null] = transformed_headers - # tmp_headers[not_null] = traces.header # Create a new Variable object to avoid copying the temporary array # The ideal solution is to use `ds_to_write[header_key][:] = tmp_headers` # but Xarray appears to be copying memory instead of doing direct assignment. From 0a49042bde7984ce2e961a8717a822b31bcb6215 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 19:36:39 +0000 Subject: [PATCH 11/23] Further cleanup and optimization --- src/mdio/segy/_disaster_recovery_wrapper.py | 10 +++++++--- src/mdio/segy/_workers.py | 7 +++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 7765822d7..746d85d1b 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -37,8 +37,9 @@ def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: def get_header_raw_and_transformed( segy_file: SegyFile, - indices: int | list[int] | NDArray | slice -) -> tuple[NDArray, NDArray, NDArray]: + indices: int | list[int] | NDArray | slice, + do_reverse_transforms: bool = True +) -> tuple[NDArray | None, NDArray, NDArray]: """Get both raw and transformed header data. Args: @@ -53,7 +54,10 @@ def get_header_raw_and_transformed( transformed_headers = traces.header # Reverse transforms to get raw data - raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) + if do_reverse_transforms: + raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) + else: + raw_headers = None return raw_headers, transformed_headers, traces diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index 5dd595ab7..dca29b34d 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -9,7 +9,6 @@ import numpy as np from segy import SegyFile -from segy.indexing import merge_cat_file from mdio.api.io import to_mdio from mdio.builder.schemas.dtype import ScalarType @@ -121,19 +120,23 @@ def trace_worker( # noqa: PLR0913 zarr_config.set({"threading.max_workers": 1}) live_trace_indexes = local_grid_map[not_null].tolist() - raw_headers, transformed_headers, traces = get_header_raw_and_transformed(segy_file, live_trace_indexes) header_key = "headers" raw_header_key = "raw_headers" + # Used to disable the reverse transforms if we aren't going to write the raw headers + do_reverse_transforms = False + # Get subset of the dataset that has not yet been saved # The headers might not be present in the dataset worker_variables = [data_variable_name] if header_key in dataset.data_vars: # Keeping the `if` here to allow for more worker configurations worker_variables.append(header_key) if raw_header_key in dataset.data_vars: + do_reverse_transforms = True worker_variables.append(raw_header_key) + raw_headers, transformed_headers, traces = get_header_raw_and_transformed(segy_file, live_trace_indexes, do_reverse_transforms=do_reverse_transforms) ds_to_write = dataset[worker_variables] if header_key in worker_variables: From 58402893a1b75c14b20629d92c0892816f38e6e7 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 11 Sep 2025 19:39:03 +0000 Subject: [PATCH 12/23] Remove missed profile decorator --- src/mdio/segy/_workers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mdio/segy/_workers.py b/src/mdio/segy/_workers.py index dca29b34d..3ddf4530a 100644 --- a/src/mdio/segy/_workers.py +++ b/src/mdio/segy/_workers.py @@ -81,7 +81,6 @@ def header_scan_worker( return cast("HeaderArray", trace_header) -# @profile def trace_worker( # noqa: PLR0913 segy_kw: SegyFileArguments, output_path: UPath, From 53fbae588e8af0171f744cd6dd2c060456f4703a Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Fri, 12 Sep 2025 18:29:37 +0000 Subject: [PATCH 13/23] Update logic --- src/mdio/segy/_disaster_recovery_wrapper.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 746d85d1b..4a8b37529 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -13,17 +13,25 @@ def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: """Reverse a single transform operation.""" from segy.schema import Endianness + from segy.transforms import ByteSwapTransform + from segy.transforms import IbmFloatTransform if isinstance(transform, ByteSwapTransform): # Reverse the endianness conversion # TODO: I don't think this is correct - if transform.target_order == Endianness.LITTLE: - reverse_target = Endianness.BIG - else: + if transform.target_order == Endianness.BIG: reverse_target = Endianness.LITTLE + reverse_transform = ByteSwapTransform(reverse_target) + return reverse_transform.apply(data) + return data - reverse_transform = ByteSwapTransform(reverse_target) - return reverse_transform.apply(data) + # if transform.target_order == Endianness.LITTLE: + # reverse_target = Endianness.BIG + # else: + # reverse_target = Endianness.LITTLE + + # reverse_transform = ByteSwapTransform(reverse_target) + # return reverse_transform.apply(data) elif isinstance(transform, IbmFloatTransform): # Reverse IBM float conversion From 51ea0a91bab0c5cd57de1cb0b05bd8fdc6c2bcc9 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 13:36:50 +0000 Subject: [PATCH 14/23] Update pipeline application logic --- src/mdio/segy/_disaster_recovery_wrapper.py | 34 +++++++++------------ 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 4a8b37529..ee07a78d8 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -10,7 +10,7 @@ from segy.transforms import Transform, ByteSwapTransform, IbmFloatTransform from numpy.typing import NDArray -def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: +def _reverse_single_transform(data: NDArray, transform: Transform, endianness: Endianness) -> NDArray: """Reverse a single transform operation.""" from segy.schema import Endianness from segy.transforms import ByteSwapTransform @@ -18,22 +18,17 @@ def _reverse_single_transform(data: NDArray, transform: Transform) -> NDArray: if isinstance(transform, ByteSwapTransform): # Reverse the endianness conversion - # TODO: I don't think this is correct - if transform.target_order == Endianness.BIG: - reverse_target = Endianness.LITTLE - reverse_transform = ByteSwapTransform(reverse_target) - return reverse_transform.apply(data) - return data - - # if transform.target_order == Endianness.LITTLE: - # reverse_target = Endianness.BIG - # else: - # reverse_target = Endianness.LITTLE - - # reverse_transform = ByteSwapTransform(reverse_target) - # return reverse_transform.apply(data) + if endianness == Endianness.BIG: + print("REVERSING TO BIG ENDIAN") + reverse_target = Endianness.BIG + else: + print("No REVERSING TO LITTLE ENDIAN") + return data + + reverse_transform = ByteSwapTransform(reverse_target) + return reverse_transform.apply(data) - elif isinstance(transform, IbmFloatTransform): + elif isinstance(transform, IbmFloatTransform): # TODO: This seems incorrect... # Reverse IBM float conversion reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) @@ -57,24 +52,23 @@ def get_header_raw_and_transformed( Returns: Tuple of (raw_headers, transformed_headers, traces) """ - traces = segy_file.trace[indices] transformed_headers = traces.header # Reverse transforms to get raw data if do_reverse_transforms: - raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline) + raw_headers = _reverse_transforms(transformed_headers, segy_file.header.transform_pipeline, segy_file.spec.endianness) else: raw_headers = None return raw_headers, transformed_headers, traces -def _reverse_transforms(transformed_data: NDArray, transform_pipeline) -> NDArray: +def _reverse_transforms(transformed_data: NDArray, transform_pipeline, endianness: Endianness) -> NDArray: """Reverse the transform pipeline to get raw data.""" raw_data = transformed_data.copy() if hasattr(transformed_data, 'copy') else transformed_data # Apply transforms in reverse order for transform in reversed(transform_pipeline.transforms): - raw_data = _reverse_single_transform(raw_data, transform) + raw_data = _reverse_single_transform(raw_data, transform, endianness) return raw_data From f433849dcd97e539e983e74d607cfb08503241e8 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 14:24:39 +0000 Subject: [PATCH 15/23] Clean testing prints --- src/mdio/segy/_disaster_recovery_wrapper.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index ee07a78d8..03373618c 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -19,10 +19,8 @@ def _reverse_single_transform(data: NDArray, transform: Transform, endianness: E if isinstance(transform, ByteSwapTransform): # Reverse the endianness conversion if endianness == Endianness.BIG: - print("REVERSING TO BIG ENDIAN") reverse_target = Endianness.BIG else: - print("No REVERSING TO LITTLE ENDIAN") return data reverse_transform = ByteSwapTransform(reverse_target) From 266a3bad3e54f104a046ebeaa0b55cf365808283 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 14:24:46 +0000 Subject: [PATCH 16/23] Begin work on tests --- tests/unit/test_disaster_recovery_wrapper.py | 347 +++++++++++++++++++ 1 file changed, 347 insertions(+) create mode 100644 tests/unit/test_disaster_recovery_wrapper.py diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py new file mode 100644 index 000000000..54effabd1 --- /dev/null +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -0,0 +1,347 @@ +"""Test harness for disaster recovery wrapper. + +This module tests the _disaster_recovery_wrapper.py functionality by creating +test SEGY files with different configurations and validating that the raw headers +from get_header_raw_and_transformed match the bytes on disk. +""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np +import pytest +from segy import SegyFile +from segy.factory import SegyFactory +from segy.schema import Endianness +from segy.schema import HeaderField +from segy.schema import SegySpec +from segy.standards import get_segy_standard + +from mdio.segy._disaster_recovery_wrapper import get_header_raw_and_transformed + +if TYPE_CHECKING: + from numpy.typing import NDArray + + +class TestDisasterRecoveryWrapper: + """Test cases for disaster recovery wrapper functionality.""" + + @pytest.fixture + def temp_dir(self) -> Path: + """Create a temporary directory for test files.""" + with tempfile.TemporaryDirectory() as tmp_dir: + yield Path(tmp_dir) + + @pytest.fixture + def basic_segy_spec(self) -> SegySpec: + """Create a basic SEGY specification for testing.""" + spec = get_segy_standard(1.0) + + # Add basic header fields for inline/crossline + header_fields = [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + HeaderField(name="cdp_x", byte=181, format="int32"), + HeaderField(name="cdp_y", byte=185, format="int32"), + ] + + return spec.customize(trace_header_fields=header_fields) + + @pytest.fixture(params=[ + {"endianness": Endianness.BIG, "data_format": 1, "name": "big_endian_ibm"}, + {"endianness": Endianness.BIG, "data_format": 5, "name": "big_endian_ieee"}, + {"endianness": Endianness.LITTLE, "data_format": 1, "name": "little_endian_ibm"}, + {"endianness": Endianness.LITTLE, "data_format": 5, "name": "little_endian_ieee"}, + ]) + def segy_config(self, request) -> dict: + """Parameterized fixture for different SEGY configurations.""" + return request.param + + def create_test_segy_file( + self, + spec: SegySpec, + num_traces: int, + samples_per_trace: int, + output_path: Path, + endianness: Endianness = Endianness.BIG, + data_format: int = 1, # 1=IBM float, 5=IEEE float + inline_range: tuple[int, int] = (1, 5), + crossline_range: tuple[int, int] = (1, 5), + ) -> SegySpec: + """Create a test SEGY file with synthetic data.""" + # Update spec with desired endianness + spec = spec.model_copy(update={"endianness": endianness}) + + factory = SegyFactory(spec=spec, samples_per_trace=samples_per_trace) + + # Create synthetic header data + headers = factory.create_trace_header_template(num_traces) + samples = factory.create_trace_sample_template(num_traces) + + # Set inline/crossline values + inline_start, inline_end = inline_range + crossline_start, crossline_end = crossline_range + + # Create a simple grid + inlines = np.arange(inline_start, inline_end + 1) + crosslines = np.arange(crossline_start, crossline_end + 1) + + trace_idx = 0 + for inline in inlines: + for crossline in crosslines: + if trace_idx >= num_traces: + break + + headers["inline"][trace_idx] = inline + headers["crossline"][trace_idx] = crossline + headers["cdp_x"][trace_idx] = inline * 100 # Simple coordinate calculation + headers["cdp_y"][trace_idx] = crossline * 100 + + # Create simple synthetic trace data + samples[trace_idx] = np.linspace(0, 1, samples_per_trace) + + trace_idx += 1 + + # Write the SEGY file with custom binary header + binary_header_updates = {"data_sample_format": data_format} + with output_path.open("wb") as f: + f.write(factory.create_textual_header()) + f.write(factory.create_binary_header(update=binary_header_updates)) + f.write(factory.create_traces(headers, samples)) + + return spec + + def extract_header_bytes_from_file( + self, segy_path: Path, trace_index: int, byte_start: int, byte_length: int + ) -> NDArray: + """Extract specific bytes from a trace header in the SEGY file.""" + with open(segy_path, "rb") as f: + # Skip text header (3200 bytes) + binary header (400 bytes) + header_offset = 3600 + + # Each trace: 240 byte header + samples + trace_size = 240 + 1501 * 4 # Assuming 1501 samples, 4 bytes each + trace_offset = header_offset + trace_index * trace_size + + f.seek(trace_offset + byte_start - 1) # SEGY is 1-based + header_bytes = f.read(byte_length) + + return np.frombuffer(header_bytes, dtype=np.uint8) + + def test_header_validation_configurations( + self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict + ) -> None: + """Test header validation with different SEGY configurations.""" + config_name = segy_config["name"] + endianness = segy_config["endianness"] + data_format = segy_config["data_format"] + + segy_path = temp_dir / f"test_{config_name}.segy" + + # Create test SEGY file + num_traces = 10 + samples_per_trace = 1501 + + spec = self.create_test_segy_file( + spec=basic_segy_spec, + num_traces=num_traces, + samples_per_trace=samples_per_trace, + output_path=segy_path, + endianness=endianness, + data_format=data_format, + ) + + # Load the SEGY file + segy_file = SegyFile(segy_path, spec=spec) + + # Test a few traces + test_indices = [0, 3, 7] + + for trace_idx in test_indices: + # Get raw and transformed headers + raw_headers, transformed_headers, traces = get_header_raw_and_transformed( + segy_file=segy_file, + indices=trace_idx, + do_reverse_transforms=True + ) + + # Extract bytes from disk for inline (bytes 189-192) and crossline (bytes 193-196) + inline_bytes_disk = self.extract_header_bytes_from_file( + segy_path, trace_idx, 189, 4 + ) + crossline_bytes_disk = self.extract_header_bytes_from_file( + segy_path, trace_idx, 193, 4 + ) + + # Convert raw headers to bytes for comparison + if raw_headers is not None: + # Extract inline and crossline from raw headers + raw_inline_bytes = np.frombuffer( + raw_headers["inline"].tobytes(), dtype=np.uint8 + )[:4] + raw_crossline_bytes = np.frombuffer( + raw_headers["crossline"].tobytes(), dtype=np.uint8 + )[:4] + + # Compare bytes + assert np.array_equal(raw_inline_bytes, inline_bytes_disk), \ + f"Inline bytes mismatch for trace {trace_idx} in {config_name}" + assert np.array_equal(raw_crossline_bytes, crossline_bytes_disk), \ + f"Crossline bytes mismatch for trace {trace_idx} in {config_name}" + + def test_header_validation_no_transforms( + self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict + ) -> None: + """Test header validation when transforms are disabled.""" + config_name = segy_config["name"] + endianness = segy_config["endianness"] + data_format = segy_config["data_format"] + + segy_path = temp_dir / f"test_no_transforms_{config_name}.segy" + + # Create test SEGY file + num_traces = 5 + samples_per_trace = 1501 + + spec = self.create_test_segy_file( + spec=basic_segy_spec, + num_traces=num_traces, + samples_per_trace=samples_per_trace, + output_path=segy_path, + endianness=endianness, + data_format=data_format, + ) + + # Load the SEGY file + segy_file = SegyFile(segy_path, spec=spec) + + # Get headers without reverse transforms + raw_headers, transformed_headers, traces = get_header_raw_and_transformed( + segy_file=segy_file, + indices=slice(None), # All traces + do_reverse_transforms=False + ) + + # When transforms are disabled, raw_headers should be None + assert raw_headers is None + + # Transformed headers should still be available + assert transformed_headers is not None + assert len(transformed_headers) == num_traces + + def test_multiple_traces_validation( + self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict + ) -> None: + """Test validation with multiple traces at once.""" + config_name = segy_config["name"] + endianness = segy_config["endianness"] + data_format = segy_config["data_format"] + + segy_path = temp_dir / f"test_multiple_traces_{config_name}.segy" + + # Create test SEGY file with more traces + num_traces = 25 # 5x5 grid + samples_per_trace = 1501 + + spec = self.create_test_segy_file( + spec=basic_segy_spec, + num_traces=num_traces, + samples_per_trace=samples_per_trace, + output_path=segy_path, + endianness=endianness, + data_format=data_format, + inline_range=(1, 5), + crossline_range=(1, 5), + ) + + # Load the SEGY file + segy_file = SegyFile(segy_path, spec=spec) + + # Get all traces + raw_headers, transformed_headers, traces = get_header_raw_and_transformed( + segy_file=segy_file, + indices=slice(None), # All traces + do_reverse_transforms=True + ) + + # Validate each trace + for trace_idx in range(num_traces): + # Extract bytes from disk + inline_bytes_disk = self.extract_header_bytes_from_file( + segy_path, trace_idx, 189, 4 + ) + crossline_bytes_disk = self.extract_header_bytes_from_file( + segy_path, trace_idx, 193, 4 + ) + + # Extract from raw headers + raw_inline_bytes = np.frombuffer( + raw_headers["inline"][trace_idx].tobytes(), dtype=np.uint8 + )[:4] + raw_crossline_bytes = np.frombuffer( + raw_headers["crossline"][trace_idx].tobytes(), dtype=np.uint8 + )[:4] + + # Compare + assert np.array_equal(raw_inline_bytes, inline_bytes_disk), \ + f"Inline bytes mismatch for trace {trace_idx} in {config_name}" + assert np.array_equal(raw_crossline_bytes, crossline_bytes_disk), \ + f"Crossline bytes mismatch for trace {trace_idx} in {config_name}" + + @pytest.mark.parametrize("trace_indices", [ + 0, # Single trace + [0, 2, 4], # Multiple specific traces + slice(1, 4), # Range of traces + ]) + def test_different_index_types( + self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict, trace_indices + ) -> None: + """Test with different types of trace indices.""" + config_name = segy_config["name"] + endianness = segy_config["endianness"] + data_format = segy_config["data_format"] + + segy_path = temp_dir / f"test_index_types_{config_name}.segy" + + # Create test SEGY file + num_traces = 10 + samples_per_trace = 1501 + + spec = self.create_test_segy_file( + spec=basic_segy_spec, + num_traces=num_traces, + samples_per_trace=samples_per_trace, + output_path=segy_path, + endianness=endianness, + data_format=data_format, + ) + + # Load the SEGY file + segy_file = SegyFile(segy_path, spec=spec) + + # Get headers with different index types + raw_headers, transformed_headers, traces = get_header_raw_and_transformed( + segy_file=segy_file, + indices=trace_indices, + do_reverse_transforms=True + ) + + # Basic validation that we got results + assert raw_headers is not None + assert transformed_headers is not None + assert traces is not None + + # Check that the number of results matches expectation + if isinstance(trace_indices, int): + expected_count = 1 + elif isinstance(trace_indices, list): + expected_count = len(trace_indices) + elif isinstance(trace_indices, slice): + expected_count = len(range(*trace_indices.indices(num_traces))) + else: + expected_count = 1 + + assert len(transformed_headers) == expected_count From 392e7e83a9857df95219668e924e3fa1e5e3e561 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 14:26:17 +0000 Subject: [PATCH 17/23] Cleanup functions and docs --- src/mdio/segy/_disaster_recovery_wrapper.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 03373618c..37ac6e29f 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -18,12 +18,10 @@ def _reverse_single_transform(data: NDArray, transform: Transform, endianness: E if isinstance(transform, ByteSwapTransform): # Reverse the endianness conversion - if endianness == Endianness.BIG: - reverse_target = Endianness.BIG - else: + if endianness == Endianness.Little: return data - reverse_transform = ByteSwapTransform(reverse_target) + reverse_transform = ByteSwapTransform(Endianness.BIG) return reverse_transform.apply(data) elif isinstance(transform, IbmFloatTransform): # TODO: This seems incorrect... @@ -46,6 +44,7 @@ def get_header_raw_and_transformed( Args: segy_file: The SegyFile instance indices: Which headers to retrieve + do_reverse_transforms: Whether to apply the reverse transform to get raw data Returns: Tuple of (raw_headers, transformed_headers, traces) From 1e992fbc37a45c57f02c370e030c9ede581d840c Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 14:28:18 +0000 Subject: [PATCH 18/23] Fix incorrect case --- src/mdio/segy/_disaster_recovery_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mdio/segy/_disaster_recovery_wrapper.py b/src/mdio/segy/_disaster_recovery_wrapper.py index 37ac6e29f..4b4ccca38 100644 --- a/src/mdio/segy/_disaster_recovery_wrapper.py +++ b/src/mdio/segy/_disaster_recovery_wrapper.py @@ -18,7 +18,7 @@ def _reverse_single_transform(data: NDArray, transform: Transform, endianness: E if isinstance(transform, ByteSwapTransform): # Reverse the endianness conversion - if endianness == Endianness.Little: + if endianness == Endianness.LITTLE: return data reverse_transform = ByteSwapTransform(Endianness.BIG) From 0978d816d0afc5e00d67e3515cb174e191d61ffa Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 14:46:09 +0000 Subject: [PATCH 19/23] Use numpy size instead of len() --- tests/unit/test_disaster_recovery_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py index 54effabd1..13907a493 100644 --- a/tests/unit/test_disaster_recovery_wrapper.py +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -230,7 +230,7 @@ def test_header_validation_no_transforms( # Transformed headers should still be available assert transformed_headers is not None - assert len(transformed_headers) == num_traces + assert transformed_headers.size == num_traces def test_multiple_traces_validation( self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict @@ -344,4 +344,4 @@ def test_different_index_types( else: expected_count = 1 - assert len(transformed_headers) == expected_count + assert transformed_headers.size == expected_count From 826cc43f482f77994e0200f5e26eac077bac498c Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 16:30:47 +0000 Subject: [PATCH 20/23] Fix incorrect application of endianness in synthetic generation --- tests/unit/test_disaster_recovery_wrapper.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py index 13907a493..e3c962d7b 100644 --- a/tests/unit/test_disaster_recovery_wrapper.py +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -73,7 +73,7 @@ def create_test_segy_file( ) -> SegySpec: """Create a test SEGY file with synthetic data.""" # Update spec with desired endianness - spec = spec.model_copy(update={"endianness": endianness}) + spec.endianness = endianness factory = SegyFactory(spec=spec, samples_per_trace=samples_per_trace) @@ -186,6 +186,11 @@ def test_header_validation_configurations( raw_headers["crossline"].tobytes(), dtype=np.uint8 )[:4] + print(f"Transformed headers: {transformed_headers.tobytes()}") + print(f"Raw headers: {raw_headers.tobytes()}") + print(f"Inline bytes disk: {inline_bytes_disk.tobytes()}") + print(f"Crossline bytes disk: {crossline_bytes_disk.tobytes()}") + # Compare bytes assert np.array_equal(raw_inline_bytes, inline_bytes_disk), \ f"Inline bytes mismatch for trace {trace_idx} in {config_name}" From d9db029a9deed3987534226e304007d57bff08dc Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 19:56:59 +0000 Subject: [PATCH 21/23] Fix structured array forcing endian re-conversion --- tests/unit/test_disaster_recovery_wrapper.py | 87 ++++++++++++++++---- 1 file changed, 71 insertions(+), 16 deletions(-) diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py index e3c962d7b..369b2644c 100644 --- a/tests/unit/test_disaster_recovery_wrapper.py +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -123,7 +123,8 @@ def extract_header_bytes_from_file( header_offset = 3600 # Each trace: 240 byte header + samples - trace_size = 240 + 1501 * 4 # Assuming 1501 samples, 4 bytes each + # trace_size = 240 + 1501 * 4 # Assuming 1501 samples, 4 bytes each + trace_size = 240 + 1 * 4 # Assuming 1 sample, 4 bytes each trace_offset = header_offset + trace_index * trace_size f.seek(trace_offset + byte_start - 1) # SEGY is 1-based @@ -143,7 +144,8 @@ def test_header_validation_configurations( # Create test SEGY file num_traces = 10 - samples_per_trace = 1501 + # samples_per_trace = 1501 + samples_per_trace = 1 spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -176,14 +178,20 @@ def test_header_validation_configurations( segy_path, trace_idx, 193, 4 ) + def extract_bytes_vectorized(data, start_byte, end_byte): + all_bytes = np.frombuffer(data.tobytes(), dtype=np.uint8) + bytes_per_element = data.dtype.intemsize + reshaped= all_bytes.reshape(-1, bytes_per_element) + return reshaped[:, start_byte:end_byte] + # Convert raw headers to bytes for comparison if raw_headers is not None: # Extract inline and crossline from raw headers - raw_inline_bytes = np.frombuffer( - raw_headers["inline"].tobytes(), dtype=np.uint8 + raw_inline_bytes = extract_bytes_vectorized( + raw_headers, 189, 193 )[:4] - raw_crossline_bytes = np.frombuffer( - raw_headers["crossline"].tobytes(), dtype=np.uint8 + raw_crossline_bytes = extract_bytes_vectorized( + raw_headers, 193, 197 )[:4] print(f"Transformed headers: {transformed_headers.tobytes()}") @@ -209,7 +217,8 @@ def test_header_validation_no_transforms( # Create test SEGY file num_traces = 5 - samples_per_trace = 1501 + # samples_per_trace = 1501 + samples_per_trace = 1 spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -241,15 +250,24 @@ def test_multiple_traces_validation( self, temp_dir: Path, basic_segy_spec: SegySpec, segy_config: dict ) -> None: """Test validation with multiple traces at once.""" + if True: + import segy + print(segy.__version__) config_name = segy_config["name"] endianness = segy_config["endianness"] data_format = segy_config["data_format"] - segy_path = temp_dir / f"test_multiple_traces_{config_name}.segy" + print(f"Config name: {config_name}") + print(f"Endianness: {endianness}") + print(f"Data format: {data_format}") + + # segy_path = temp_dir / f"test_multiple_traces_{config_name}.segy" + segy_path = Path(f"test_multiple_traces_{config_name}.segy") # Create test SEGY file with more traces num_traces = 25 # 5x5 grid - samples_per_trace = 1501 + # samples_per_trace = 1501 + samples_per_trace = 1 spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -272,6 +290,8 @@ def test_multiple_traces_validation( do_reverse_transforms=True ) + first = True + # Validate each trace for trace_idx in range(num_traces): # Extract bytes from disk @@ -282,13 +302,47 @@ def test_multiple_traces_validation( segy_path, trace_idx, 193, 4 ) + if first: + print(raw_headers.dtype) + print(raw_headers.shape) + first = False + # Extract from raw headers - raw_inline_bytes = np.frombuffer( - raw_headers["inline"][trace_idx].tobytes(), dtype=np.uint8 - )[:4] - raw_crossline_bytes = np.frombuffer( - raw_headers["crossline"][trace_idx].tobytes(), dtype=np.uint8 - )[:4] + # Note: We need to extract bytes directly from the structured array to preserve endianness + # Getting a scalar and calling .tobytes() loses endianness information + if raw_headers.ndim == 0: + # Single trace case + raw_data_bytes = raw_headers.tobytes() + inline_offset = raw_headers.dtype.fields['inline'][1] + crossline_offset = raw_headers.dtype.fields['crossline'][1] + inline_size = raw_headers.dtype.fields['inline'][0].itemsize + crossline_size = raw_headers.dtype.fields['crossline'][0].itemsize + + raw_inline_bytes = np.frombuffer( + raw_data_bytes[inline_offset:inline_offset+inline_size], dtype=np.uint8 + ) + raw_crossline_bytes = np.frombuffer( + raw_data_bytes[crossline_offset:crossline_offset+crossline_size], dtype=np.uint8 + ) + else: + # Multiple traces case + raw_data_bytes = raw_headers[trace_idx:trace_idx+1].tobytes() + inline_offset = raw_headers.dtype.fields['inline'][1] + crossline_offset = raw_headers.dtype.fields['crossline'][1] + inline_size = raw_headers.dtype.fields['inline'][0].itemsize + crossline_size = raw_headers.dtype.fields['crossline'][0].itemsize + + raw_inline_bytes = np.frombuffer( + raw_data_bytes[inline_offset:inline_offset+inline_size], dtype=np.uint8 + ) + raw_crossline_bytes = np.frombuffer( + raw_data_bytes[crossline_offset:crossline_offset+crossline_size], dtype=np.uint8 + ) + + print(f"Raw inline bytes: {raw_inline_bytes.tobytes()}") + print(f"Inline bytes disk: {inline_bytes_disk.tobytes()}") + print(f"Raw crossline bytes: {raw_crossline_bytes.tobytes()}") + print(f"Crossline bytes disk: {crossline_bytes_disk.tobytes()}") # Compare assert np.array_equal(raw_inline_bytes, inline_bytes_disk), \ @@ -313,7 +367,8 @@ def test_different_index_types( # Create test SEGY file num_traces = 10 - samples_per_trace = 1501 + # samples_per_trace = 1501 + samples_per_trace = 1 spec = self.create_test_segy_file( spec=basic_segy_spec, From 809046ad605506ce4a6fb18231ae8790e0579eec Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 20:06:28 +0000 Subject: [PATCH 22/23] Fix config validation test --- tests/unit/test_disaster_recovery_wrapper.py | 60 ++++++++++++-------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py index 369b2644c..f286648e3 100644 --- a/tests/unit/test_disaster_recovery_wrapper.py +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from numpy.typing import NDArray +SAMPLES_PER_TRACE = 1501 class TestDisasterRecoveryWrapper: """Test cases for disaster recovery wrapper functionality.""" @@ -123,8 +124,7 @@ def extract_header_bytes_from_file( header_offset = 3600 # Each trace: 240 byte header + samples - # trace_size = 240 + 1501 * 4 # Assuming 1501 samples, 4 bytes each - trace_size = 240 + 1 * 4 # Assuming 1 sample, 4 bytes each + trace_size = 240 + SAMPLES_PER_TRACE * 4 # samples * 4 bytes each trace_offset = header_offset + trace_index * trace_size f.seek(trace_offset + byte_start - 1) # SEGY is 1-based @@ -144,8 +144,7 @@ def test_header_validation_configurations( # Create test SEGY file num_traces = 10 - # samples_per_trace = 1501 - samples_per_trace = 1 + samples_per_trace = SAMPLES_PER_TRACE spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -178,21 +177,39 @@ def test_header_validation_configurations( segy_path, trace_idx, 193, 4 ) - def extract_bytes_vectorized(data, start_byte, end_byte): - all_bytes = np.frombuffer(data.tobytes(), dtype=np.uint8) - bytes_per_element = data.dtype.intemsize - reshaped= all_bytes.reshape(-1, bytes_per_element) - return reshaped[:, start_byte:end_byte] - # Convert raw headers to bytes for comparison if raw_headers is not None: - # Extract inline and crossline from raw headers - raw_inline_bytes = extract_bytes_vectorized( - raw_headers, 189, 193 - )[:4] - raw_crossline_bytes = extract_bytes_vectorized( - raw_headers, 193, 197 - )[:4] + # Extract from raw headers + # Note: We need to extract bytes directly from the structured array to preserve endianness + # Getting a scalar and calling .tobytes() loses endianness information + if raw_headers.ndim == 0: + # Single trace case + raw_data_bytes = raw_headers.tobytes() + inline_offset = raw_headers.dtype.fields['inline'][1] + crossline_offset = raw_headers.dtype.fields['crossline'][1] + inline_size = raw_headers.dtype.fields['inline'][0].itemsize + crossline_size = raw_headers.dtype.fields['crossline'][0].itemsize + + raw_inline_bytes = np.frombuffer( + raw_data_bytes[inline_offset:inline_offset+inline_size], dtype=np.uint8 + ) + raw_crossline_bytes = np.frombuffer( + raw_data_bytes[crossline_offset:crossline_offset+crossline_size], dtype=np.uint8 + ) + else: + # Multiple traces case - this test uses single trace index, so extract that trace + raw_data_bytes = raw_headers[0:1].tobytes() # Extract first trace + inline_offset = raw_headers.dtype.fields['inline'][1] + crossline_offset = raw_headers.dtype.fields['crossline'][1] + inline_size = raw_headers.dtype.fields['inline'][0].itemsize + crossline_size = raw_headers.dtype.fields['crossline'][0].itemsize + + raw_inline_bytes = np.frombuffer( + raw_data_bytes[inline_offset:inline_offset+inline_size], dtype=np.uint8 + ) + raw_crossline_bytes = np.frombuffer( + raw_data_bytes[crossline_offset:crossline_offset+crossline_size], dtype=np.uint8 + ) print(f"Transformed headers: {transformed_headers.tobytes()}") print(f"Raw headers: {raw_headers.tobytes()}") @@ -217,8 +234,7 @@ def test_header_validation_no_transforms( # Create test SEGY file num_traces = 5 - # samples_per_trace = 1501 - samples_per_trace = 1 + samples_per_trace = SAMPLES_PER_TRACE spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -266,8 +282,7 @@ def test_multiple_traces_validation( # Create test SEGY file with more traces num_traces = 25 # 5x5 grid - # samples_per_trace = 1501 - samples_per_trace = 1 + samples_per_trace = SAMPLES_PER_TRACE spec = self.create_test_segy_file( spec=basic_segy_spec, @@ -367,8 +382,7 @@ def test_different_index_types( # Create test SEGY file num_traces = 10 - # samples_per_trace = 1501 - samples_per_trace = 1 + samples_per_trace = SAMPLES_PER_TRACE spec = self.create_test_segy_file( spec=basic_segy_spec, From c7e6c351e66c83949a9d141a2010c071f7bdb8f2 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 15 Sep 2025 20:10:04 +0000 Subject: [PATCH 23/23] Remove persistent save of generated segy file --- tests/unit/test_disaster_recovery_wrapper.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/test_disaster_recovery_wrapper.py b/tests/unit/test_disaster_recovery_wrapper.py index f286648e3..960d5479b 100644 --- a/tests/unit/test_disaster_recovery_wrapper.py +++ b/tests/unit/test_disaster_recovery_wrapper.py @@ -277,8 +277,7 @@ def test_multiple_traces_validation( print(f"Endianness: {endianness}") print(f"Data format: {data_format}") - # segy_path = temp_dir / f"test_multiple_traces_{config_name}.segy" - segy_path = Path(f"test_multiple_traces_{config_name}.segy") + segy_path = temp_dir / f"test_multiple_traces_{config_name}.segy" # Create test SEGY file with more traces num_traces = 25 # 5x5 grid