Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8978431
Export part 1
dmitriyrepin Aug 20, 2025
04ea8a4
Enable header value validation
dmitriyrepin Aug 21, 2025
5809ad0
Revert the test names back
dmitriyrepin Aug 25, 2025
c5cefde
Remove Endianness, new_chunks API args and traceDomain,
dmitriyrepin Aug 25, 2025
77045da
PR review
dmitriyrepin Aug 25, 2025
2a53a68
lint
tasansal Sep 3, 2025
fe73dee
create/use new api location and lint
tasansal Sep 3, 2025
43d419c
allow configuring opener chunks
tasansal Sep 3, 2025
0f33a7e
clarify xarray open parameters
tasansal Sep 3, 2025
29c3fc4
fix regression of not-opening with native dask re-chunking
tasansal Sep 3, 2025
9d7f245
fix regression of not-opening with native dask re-chunking
tasansal Sep 3, 2025
8194c38
make export rechunker work with named dimension sizes and chunks
tasansal Sep 3, 2025
a987bf7
make StorageLocation available at library level and update mdio to se…
tasansal Sep 3, 2025
20e52f4
pre-open with zarr backend and simplify dataset slicing after lazy lo…
tasansal Sep 3, 2025
732f7b5
better opener docs
tasansal Sep 3, 2025
f7b63c5
more explicit xarray selection
tasansal Sep 3, 2025
b92ec5f
rename trace variable name to default variable name
tasansal Sep 3, 2025
59d0e4e
remove the guard for setting storage options to empty dictionary. new…
tasansal Sep 3, 2025
8246b9c
update lockfile
tasansal Sep 3, 2025
a9df337
fix broken tests and inconsistent type hints
tasansal Sep 3, 2025
fe11129
clean up comments
tasansal Sep 3, 2025
7712a0d
clarify binary header scaling
tasansal Sep 3, 2025
2d658d0
make test names clearer
tasansal Sep 3, 2025
96c58b1
fix broken unit tests due to storage_options handling
tasansal Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mdio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from mdio.api import MDIOReader
from mdio.api import MDIOWriter
from mdio.api.convenience import copy_mdio
from mdio.api.opener import open_dataset
from mdio.converters import mdio_to_segy
from mdio.converters import numpy_to_mdio
from mdio.converters import segy_to_mdio
Expand All @@ -14,11 +15,13 @@
from mdio.core.factory import create_empty
from mdio.core.factory import create_empty_like
from mdio.core.grid import Grid
from mdio.core.storage_location import StorageLocation

__all__ = [
"MDIOReader",
"MDIOWriter",
"copy_mdio",
"open_dataset",
"mdio_to_segy",
"numpy_to_mdio",
"segy_to_mdio",
Expand All @@ -28,6 +31,7 @@
"create_empty",
"create_empty_like",
"Grid",
"StorageLocation",
]


Expand Down
3 changes: 0 additions & 3 deletions src/mdio/api/convenience.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ def copy_mdio( # noqa: PLR0913
storage_options_output: Storage options for output MDIO.

"""
storage_options_input = storage_options_input or {}
storage_options_output = storage_options_output or {}

create_empty_like(
source_path,
target_path,
Expand Down
35 changes: 35 additions & 0 deletions src/mdio/api/opener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Utils for reading MDIO dataset."""

from __future__ import annotations

from typing import TYPE_CHECKING

import xarray as xr

if TYPE_CHECKING:
from xarray.core.types import T_Chunks

from mdio.core.storage_location import StorageLocation


def open_dataset(storage_location: StorageLocation, chunks: T_Chunks = None) -> xr.Dataset:
"""Open a Zarr dataset from the specified storage location.

Args:
storage_location: StorageLocation for the dataset.
chunks: If provided, loads data into dask arrays with new chunking.
- ``chunks="auto"`` will use dask ``auto`` chunking
- ``chunks=None`` skips using dask, which is generally faster for small arrays.
- ``chunks=-1`` loads the data with dask using a single chunk for all arrays.
- ``chunks={}`` loads the data with dask using the engine's preferred chunk size (on disk).
- ``chunks={dim: chunk, ...}`` loads the data with dask using the specified chunk size for each dimension.

See dask chunking for more details.

Returns:
An Xarray dataset opened from the storage location.
"""
# NOTE: If mask_and_scale is not set,
# Xarray will convert int to float and replace _FillValue with NaN
# Fixed in Zarr v3, so we can fix this later.
return xr.open_dataset(storage_location.uri, engine="zarr", chunks=chunks, mask_and_scale=False)
120 changes: 51 additions & 69 deletions src/mdio/converters/mdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

import numpy as np
from psutil import cpu_count
from tqdm.dask import TqdmCallback

from mdio import MDIOReader
from mdio.api.opener import open_dataset
from mdio.segy.blocked_io import to_segy
from mdio.segy.creation import concat_files
from mdio.segy.creation import mdio_spec_to_segy
Expand All @@ -21,18 +22,19 @@
except ImportError:
distributed = None

if TYPE_CHECKING:
from segy.schema import SegySpec

from mdio.core.storage_location import StorageLocation

default_cpus = cpu_count(logical=True)
NUM_CPUS = int(os.getenv("MDIO__EXPORT__CPU_COUNT", default_cpus))


def mdio_to_segy( # noqa: PLR0912, PLR0913
mdio_path_or_buffer: str,
output_segy_path: str,
endian: str = "big",
access_pattern: str = "012",
storage_options: dict = None,
new_chunks: tuple[int, ...] = None,
def mdio_to_segy( # noqa: PLR0912, PLR0913, PLR0915
segy_spec: SegySpec,
input_location: StorageLocation,
output_location: StorageLocation,
selection_mask: np.ndarray = None,
client: distributed.Client = None,
) -> None:
Expand All @@ -47,13 +49,9 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
A `selection_mask` can be provided (same shape as spatial grid) to export a subset.

Args:
mdio_path_or_buffer: Input path where the MDIO is located.
output_segy_path: Path to the output SEG-Y file.
endian: Endianness of the input SEG-Y. Rev.2 allows little endian. Default is 'big'.
access_pattern: This specificies the chunk access pattern. Underlying zarr.Array must
exist. Examples: '012', '01'
storage_options: Storage options for the cloud storage backend. Default: None (anonymous)
new_chunks: Set manual chunksize. For development purposes only.
segy_spec: The SEG-Y specification to use for the conversion.
input_location: Store or URL (and cloud options) for MDIO file.
output_location: Path to the output SEG-Y file.
selection_mask: Array that lists the subset of traces
client: Dask client. If `None` we will use local threaded scheduler. If `auto` is used we
will create multiple processes (with 8 threads each).
Expand All @@ -64,86 +62,70 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913

Examples:
To export an existing local MDIO file to SEG-Y we use the code snippet below. This will
export the full MDIO (without padding) to SEG-Y format using IBM floats and big-endian
byte order.
export the full MDIO (without padding) to SEG-Y format.

>>> from mdio import mdio_to_segy
>>>
>>> from mdio import mdio_to_segy, StorageLocation
>>>
>>> mdio_to_segy(
... mdio_path_or_buffer="prefix2/file.mdio",
... output_segy_path="prefix/file.segy",
... )

If we want to export this as an IEEE big-endian, using a selection mask, we would run:

>>> mdio_to_segy(
... mdio_path_or_buffer="prefix2/file.mdio",
... output_segy_path="prefix/file.segy",
... selection_mask=boolean_mask,
... )

>>> input_location = StorageLocation("prefix2/file.mdio")
>>> output_location = StorageLocation("prefix/file.segy")
>>> mdio_to_segy(input_location, output_location)
"""
backend = "dask"

output_segy_path = Path(output_segy_path)
output_segy_path = Path(output_location.uri)

mdio = MDIOReader(
mdio_path_or_buffer=mdio_path_or_buffer,
access_pattern=access_pattern,
storage_options=storage_options,
)
# First we open with vanilla zarr backend and then get some info
# We will re-open with `new_chunks` and Dask later in mdio_spec_to_segy
dataset = open_dataset(input_location)

if new_chunks is None:
new_chunks = segy_export_rechunker(mdio.chunks, mdio.shape, mdio._traces.dtype)
default_variable_name = dataset.attrs["attributes"]["default_variable_name"]
amplitude = dataset[default_variable_name]
chunks = amplitude.encoding["preferred_chunks"]
sizes = amplitude.sizes
dtype = amplitude.dtype
new_chunks = segy_export_rechunker(chunks, sizes, dtype)

creation_args = [
mdio_path_or_buffer,
output_segy_path,
access_pattern,
endian,
storage_options,
new_chunks,
backend,
]
creation_args = [segy_spec, input_location, output_location, new_chunks]

if client is not None:
if distributed is not None:
# This is in case we work with big data
feature = client.submit(mdio_spec_to_segy, *creation_args)
mdio, segy_factory = feature.result()
dataset, segy_factory = feature.result()
else:
msg = "Distributed client was provided, but `distributed` is not installed"
raise ImportError(msg)
else:
mdio, segy_factory = mdio_spec_to_segy(*creation_args)
dataset, segy_factory = mdio_spec_to_segy(*creation_args)

live_mask = mdio.live_mask.compute()
trace_mask = dataset["trace_mask"].compute()

if selection_mask is not None:
live_mask = live_mask & selection_mask
if trace_mask.shape != selection_mask.shape:
msg = "Selection mask and trace mask shapes do not match."
raise ValueError(msg)
selection_mask = trace_mask.copy(data=selection_mask) # make into DataArray
trace_mask = trace_mask & selection_mask

# This handles the case if we are skipping a whole block.
if live_mask.sum() == 0:
if trace_mask.sum() == 0:
msg = "No traces will be written out. Live mask is empty."
raise ValueError(msg)

# Find rough dim limits, so we don't unnecessarily hit disk / cloud store.
# Typically, gets triggered when there is a selection mask
dim_slices = ()
live_nonzeros = live_mask.nonzero()
for dim_nonzeros in live_nonzeros:
start = np.min(dim_nonzeros)
stop = np.max(dim_nonzeros) + 1
dim_slices += (slice(start, stop),)
dim_slices = {}
dim_live_indices = np.nonzero(trace_mask.values)
for dim_name, dim_live in zip(trace_mask.dims, dim_live_indices, strict=True):
start = dim_live.min().item()
stop = dim_live.max().item() + 1
dim_slices[dim_name] = slice(start, stop)

# Lazily pull the data with limits now, and limit mask so its the same shape.
live_mask, headers, samples = mdio[dim_slices]
live_mask = live_mask.rechunk(headers.chunks)
# Lazily pull the data with limits now.
# All the variables, metadata, etc. is all sliced to the same range.
dataset = dataset.isel(dim_slices)

if selection_mask is not None:
selection_mask = selection_mask[dim_slices]
live_mask = live_mask & selection_mask
dataset["trace_mask"] = dataset["trace_mask"] & selection_mask

# tmp file root
out_dir = output_segy_path.parent
Expand All @@ -152,9 +134,9 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
with tmp_dir:
with TqdmCallback(desc="Unwrapping MDIO Blocks"):
block_records = to_segy(
samples=samples,
headers=headers,
live_mask=live_mask,
samples=dataset[default_variable_name].data,
headers=dataset["headers"].data,
live_mask=dataset["trace_mask"].data,
segy_factory=segy_factory,
file_root=tmp_dir.name,
)
Expand Down
4 changes: 2 additions & 2 deletions src/mdio/converters/segy.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,13 @@ def segy_to_mdio(
xr_dataset = xr_dataset.drop_vars(drop_vars_delayed)

# Write the headers and traces in chunks using grid_map to indicate dead traces
data_variable_name = mdio_template.trace_variable_name
default_variable_name = mdio_template.default_variable_name
# This is an memory-expensive and time-consuming read-write operation
# performed in chunks to save the memory
blocked_io.to_zarr(
segy_file=segy_file,
output_location=output_location,
grid_map=grid.map,
dataset=xr_dataset,
data_variable_name=data_variable_name,
data_variable_name=default_variable_name,
)
5 changes: 0 additions & 5 deletions src/mdio/core/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ def create_empty(
"""
zarr.config.set({"default_zarr_format": 2, "write_empty_chunks": False})

storage_options = storage_options or {}

url = process_url(url=config.path, disk_cache=False)
root_group = open_group(url, mode="w", storage_options=storage_options)
root_group = create_zarr_hierarchy(root_group, overwrite)
Expand Down Expand Up @@ -206,9 +204,6 @@ def create_empty_like(
storage_options_input: Options for storage backend of the source dataset.
storage_options_output: Options for storage backend of the destination dataset.
"""
storage_options_input = storage_options_input or {}
storage_options_output = storage_options_output or {}

source_root = zarr.open_consolidated(
source_path,
mode="r",
Expand Down
12 changes: 7 additions & 5 deletions src/mdio/schemas/v1/templates/abstract_dataset_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def build_dataset(
self._dim_sizes = sizes
self._horizontal_coord_unit = horizontal_coord_unit

self._builder = MDIODatasetBuilder(name=name, attributes=self._load_dataset_attributes())
attr = self._load_dataset_attributes() or UserAttributes(attributes={})
attr.attributes["default_variable_name"] = self._default_variable_name
self._builder = MDIODatasetBuilder(name=name, attributes=attr)
self._add_dimensions()
self._add_coordinates()
self._add_variables()
Expand All @@ -99,9 +101,9 @@ def name(self) -> str:
return self._name

@property
def trace_variable_name(self) -> str:
def default_variable_name(self) -> str:
"""Returns the name of the trace variable."""
return self._trace_variable_name
return self._default_variable_name

@property
def trace_domain(self) -> str:
Expand Down Expand Up @@ -130,7 +132,7 @@ def _name(self) -> str:
"""

@property
def _trace_variable_name(self) -> str:
def _default_variable_name(self) -> str:
"""Get the name of the data variable.

A virtual method that can be overwritten by subclasses to return a
Expand Down Expand Up @@ -226,7 +228,7 @@ def _add_variables(self) -> None:
Uses the class field 'builder' to add variables to the dataset.
"""
self._builder.add_variable(
name=self._trace_variable_name,
name=self.default_variable_name,
dimensions=self._dim_names,
data_type=ScalarType.FLOAT32,
compressor=compressors.Blosc(algorithm=compressors.BloscAlgorithm.ZSTD),
Expand Down
Loading
Loading