Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Ignore example dataset storage generated in tutorial
pod_data/
**/pod_data/

# Autogenerated version file
_version.py
Expand Down
6 changes: 6 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -v --cov=src --cov-report=term-missing
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
numpy
xxhash
xxhash
# TODO: separate dev and prod requirements
pytest>=7.4.0
pytest-cov>=4.1.0
4 changes: 3 additions & 1 deletion src/orcabridge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
"DirDataStore",
"SafeDirDataStore",
"DEFAULT_TRACKER",
"SyncStreamFromLists",
]

from .mapper import MapTags, MapPackets, Join, tag, packet
from .pod import FunctionPod, function_pod
from .source import GlobSource
from .store import DirDataStore, SafeDirDataStore
from .store import DirDataStore, SafeDirDataStore
from .stream import SyncStreamFromLists as SimpleStream
161 changes: 120 additions & 41 deletions src/orcabridge/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from orcabridge.hashing import HashableMixin
from .types import Tag, Packet
from orcabridge.types import Tag, Packet
from typing import (
Optional,
Tuple,
Expand All @@ -10,7 +10,7 @@
Iterator,
)
from collections.abc import Collection
from typing import Any, List, Tuple
import threading


class Operation(HashableMixin):
Expand Down Expand Up @@ -40,9 +40,12 @@ def keys(self, *streams: "SyncStream") -> Tuple[List[str], List[str]]:
@property
def label(self) -> str:
"""
Overwrite this method to attain a custom label logic for the operation.
Returns a human-readable label for this operation.
Default implementation returns the provided label or class name if no label was provided.
"""
return self._label
if self._label:
return self._label
return self.__class__.__name__

@label.setter
def label(self, value: str) -> None:
Expand All @@ -58,17 +61,17 @@ def identity_structure(self, *streams: "SyncStream") -> Any:
def __call__(self, *streams: "SyncStream", **kwargs) -> "SyncStream":
# trigger call on source if passed as stream

streams = [stream() if isinstance(stream, Source) else stream for stream in streams]
streams = [
stream() if isinstance(stream, Source) else stream
for stream in streams
]
output_stream = self.forward(*streams, **kwargs)
# create an invocation instance
invocation = Invocation(self, streams)
# label the output_stream with the invocation information
output_stream.invocation = invocation

# delay import to avoid circular import
from .tracker import Tracker

# reg
# register the invocation with active trackers
active_trackers = Tracker.get_active_trackers()
for tracker in active_trackers:
tracker.record(invocation)
Expand All @@ -78,16 +81,64 @@ def __call__(self, *streams: "SyncStream", **kwargs) -> "SyncStream":
def __repr__(self):
return self.__class__.__name__

def __str__(self):
if self._label is not None:
return f"{self.__class__.__name__}({self._label})"
return self.__class__.__name__

def forward(self, *streams: "SyncStream") -> "SyncStream": ...


class Tracker:
"""
A tracker is a class that can track the invocations of operations. Only "active" trackers
participate in tracking and its `record` method gets called on each invocation of an operation.
Multiple trackers can be active at any time.
"""

_local = threading.local()

@classmethod
def get_active_trackers(cls) -> List["Tracker"]:
if hasattr(cls._local, "active_trackers"):
return cls._local.active_trackers
return []

def __init__(self):
self.active = False

def activate(self) -> None:
"""
Activate the tracker. This is a no-op if the tracker is already active.
"""
if not self.active:
if not hasattr(self._local, "active_trackers"):
self._local.active_trackers = []
self._local.active_trackers.append(self)
self.active = True

def deactivate(self) -> None:
# Remove this tracker from active trackers
if hasattr(self._local, "active_trackers") and self.active:
self._local.active_trackers.remove(self)
self.active = False

def __enter__(self):
self.activate()
return self

def __exit__(self, exc_type, exc_val, ext_tb):
self.deactivate()

def record(self, invocation: "Invocation") -> None: ...


class Invocation(HashableMixin):
"""
This class represents an invocation of an operation on a collection of streams.
It contains the operation, the invocation ID, and the streams that were used
in the invocation.
The invocation ID is a unique identifier for the invocation and is used to
track the invocation in the tracker.
It contains the operation and the streams that were used in the invocation.
Note that the collection of streams may be empty, in which case the invocation
likely corresponds to a source operation.
"""

def __init__(
Expand All @@ -108,8 +159,8 @@ def keys(self) -> Tuple[Collection[str], Collection[str]]:
return self.operation.keys(*self.streams)

def identity_structure(self) -> int:
# default implementation is streams order sensitive. If an operation does
# not depend on the order of the streams, it should override this method
# Identity of an invocation is entirely dependend on
# the operation's identity structure upon invocation
return self.operation.identity_structure(*self.streams)

def __eq__(self, other: Any) -> bool:
Expand All @@ -136,48 +187,49 @@ class Stream(HashableMixin):
This may be None if the stream is not generated by an operation.
"""

def __init__(self, **kwargs) -> None:
def __init__(self, label: Optional[str] = None, **kwargs) -> None:
super().__init__(**kwargs)
self._invocation: Optional[Invocation] = None
self._label = label

def identity_structure(self) -> Any:
"""
Identity structure of a stream is deferred to the identity structure
of the associated invocation, if present.
A bare stream without invocation has no well-defined identity structure.
"""
if self.invocation is not None:
return self.invocation.identity_structure()
return super().identity_structure()

@property
def label(self) -> str:
"""
Returns a human-readable label for this stream.
If no label is provided and the stream is generated by an operation,
the label of the operation is used.
Otherwise, the class name is used as the label.
"""
if self._label is None:
if self.invocation is not None:
# use the invocation operation label
return self.invocation.operation.label
else:
return self.__class__.__name__
return self._label

@property
def invocation(self) -> Optional[Invocation]:
return self._invocation

@invocation.setter
def invocation(self, value: Invocation) -> None:
if not isinstance(value, Invocation):
raise TypeError("invocation field must be an instance of Invocation")
raise TypeError(
"invocation field must be an instance of Invocation"
)
self._invocation = value

def __iter__(self) -> Iterator[Tuple[Tag, Packet]]:
raise NotImplementedError("Subclasses must implement __iter__ method")

def flow(self) -> Collection[Tuple[Tag, Packet]]:
"""
Flow everything through the stream, returning the entire collection of
(Tag, Packet) as a collection. This will tigger any upstream computation of the stream.
"""
return list(self)


class SyncStream(Stream):
"""
A stream that will complete in a fixed amount of time. It is suitable for synchronous operations that
will have to wait for the stream to finish before proceeding.
"""

def content_hash(self) -> str:
if self.invocation is not None: # and hasattr(self.invocation, "invocation_id"):
# use the invocation ID as the hash
return self.invocation.content_hash()
return super().content_hash()

def keys(self) -> Tuple[Collection[str], Collection[str]]:
"""
Returns the keys of the stream.
Expand All @@ -194,9 +246,28 @@ def keys(self) -> Tuple[Collection[str], Collection[str]]:
if tag_keys is not None and packet_keys is not None:
return tag_keys, packet_keys
# otherwise, use the keys from the first packet in the stream
# note that this may be computationally expensive
tag, packet = next(iter(self))
return list(tag.keys()), list(packet.keys())

def __iter__(self) -> Iterator[Tuple[Tag, Packet]]:
raise NotImplementedError("Subclasses must implement __iter__ method")

def flow(self) -> Collection[Tuple[Tag, Packet]]:
"""
Flow everything through the stream, returning the entire collection of
(Tag, Packet) as a collection. This will tigger any upstream computation of the stream.
"""
return list(self)


class SyncStream(Stream):
"""
A stream that will complete in a fixed amount of time.
It is suitable for synchronous operations that
will have to wait for the stream to finish before proceeding.
"""

def head(self, n: int = 5) -> None:
"""
Print the first n elements of the stream.
Expand All @@ -223,9 +294,9 @@ def __rshift__(self, transformer: Any) -> "SyncStream":
The mapping is applied to each packet in the stream and the resulting packets
are returned in a new stream.
"""
# TODO: remove just in time import
from .mapper import MapPackets

# TODO: extend to generic mapping
if isinstance(transformer, dict):
return MapPackets(transformer)(self)
elif isinstance(transformer, Callable):
Expand All @@ -235,13 +306,21 @@ def __mul__(self, other: "SyncStream") -> "SyncStream":
"""
Returns a new stream that is the result joining with the other stream
"""
# TODO: remove just in time import
from .mapper import Join

if not isinstance(other, SyncStream):
raise TypeError("other must be a SyncStream")
return Join()(self, other)


class Mapper(Operation):
"""
A Mapper is an operation that does NOT generate new file content.
It is used to control the flow of data in the pipeline without modifying or creating new data (file).
"""


class Source(Operation, SyncStream):
"""
A base class for all sources in the system. A source can be seen as a special
Expand Down
Loading