diff --git a/core/amber/src/main/python/core/architecture/managers/context.py b/core/amber/src/main/python/core/architecture/managers/context.py index 4236ccb2b3a..23d6023e84f 100644 --- a/core/amber/src/main/python/core/architecture/managers/context.py +++ b/core/amber/src/main/python/core/architecture/managers/context.py @@ -2,6 +2,7 @@ from .console_message_manager import ConsoleMessageManager from .debug_manager import DebugManager from .exception_manager import ExceptionManager +from .marker_processing_manager import MarkerProcessingManager from .tuple_processing_manager import TupleProcessingManager from .executor_manager import ExecutorManager from .pause_manager import PauseManager @@ -26,6 +27,7 @@ def __init__(self, worker_id, input_queue): self.input_queue: InternalQueue = input_queue self.executor_manager = ExecutorManager() self.tuple_processing_manager = TupleProcessingManager() + self.marker_processing_manager = MarkerProcessingManager() self.exception_manager = ExceptionManager() self.state_manager = StateManager( { diff --git a/core/amber/src/main/python/core/architecture/managers/marker_processing_manager.py b/core/amber/src/main/python/core/architecture/managers/marker_processing_manager.py new file mode 100644 index 00000000000..6a865544360 --- /dev/null +++ b/core/amber/src/main/python/core/architecture/managers/marker_processing_manager.py @@ -0,0 +1,16 @@ +from typing import Optional +from core.models.marker import State, Marker + + +class MarkerProcessingManager: + def __init__(self): + self.current_input_marker: Optional[Marker] = None + self.current_output_state: Optional[State] = None + + def get_input_marker(self) -> Optional[State]: + ret, self.current_input_marker = self.current_input_marker, None + return ret + + def get_output_state(self) -> Optional[State]: + ret, self.current_output_state = self.current_output_state, None + return ret diff --git a/core/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py b/core/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py index e321037eb93..c217d5fe372 100644 --- a/core/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py +++ b/core/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py @@ -1,21 +1,29 @@ from threading import Event, Condition -from typing import Optional, Union, Tuple, Iterator +from typing import Optional, Tuple, Iterator -from core.models import InputExhausted from proto.edu.uci.ics.amber.engine.common import PortIdentity class TupleProcessingManager: def __init__(self): - self.current_input_tuple: Optional[Union[Tuple, InputExhausted]] = None + self.current_input_tuple: Optional[Tuple] = None self.current_input_port_id: Optional[PortIdentity] = None - self.current_input_tuple_iter: Optional[ - Iterator[Union[Tuple, InputExhausted]] - ] = None + self.current_input_tuple_iter: Optional[Iterator[Tuple]] = None self.current_output_tuple: Optional[Tuple] = None self.context_switch_condition: Condition = Condition() self.finished_current: Event = Event() + def get_input_tuple(self) -> Optional[Tuple]: + ret, self.current_input_tuple = self.current_input_tuple, None + return ret + def get_output_tuple(self) -> Optional[Tuple]: ret, self.current_output_tuple = self.current_output_tuple, None return ret + + def get_input_port_id(self) -> int: + port_id = self.current_input_port_id + # no upstream, special case for source executor. + if port_id is None: + return 0 + return port_id.id diff --git a/core/amber/src/main/python/core/architecture/packaging/input_manager.py b/core/amber/src/main/python/core/architecture/packaging/input_manager.py index d43491a9f4c..519ab448628 100644 --- a/core/amber/src/main/python/core/architecture/packaging/input_manager.py +++ b/core/amber/src/main/python/core/architecture/packaging/input_manager.py @@ -1,8 +1,15 @@ from typing import Iterator, Optional, Union, Dict, List - -from core.models import Tuple, ArrowTableTupleProvider, Schema, InputExhausted -from core.models.internal_marker import EndOfAll, InternalMarker, SenderChange -from core.models.marker import EndOfUpstream +from pyarrow.lib import Table +from core.models import Tuple, ArrowTableTupleProvider, Schema +from core.models.internal_marker import ( + InternalMarker, + StartOfOutputPorts, + EndOfOutputPorts, + SenderChange, + EndOfInputPort, + StartOfInputPort, +) +from core.models.marker import EndOfInputChannel, State, StartOfInputChannel, Marker from core.models.payload import DataFrame, DataPayload, MarkerFrame from proto.edu.uci.ics.amber.engine.common import ( ActorVirtualIdentity, @@ -50,6 +57,7 @@ def __init__(self): self._ports: Dict[PortIdentity, WorkerPort] = dict() self._channels: Dict[ChannelIdentity, Channel] = dict() self._current_channel_id: Optional[ChannelIdentity] = None + self.started = False def add_input_port(self, port_id: PortIdentity, schema: Schema) -> None: if port_id.id is None: @@ -78,16 +86,21 @@ def register_input( def process_data_payload( self, from_: ActorVirtualIdentity, payload: DataPayload - ) -> Iterator[Union[Tuple, InputExhausted, InternalMarker]]: + ) -> Iterator[Union[Tuple, InternalMarker]]: # special case used to yield for source op if from_ == InputManager.SOURCE_STARTER: - yield InputExhausted() - yield EndOfAll() + yield EndOfInputChannel() + yield EndOfOutputPorts() return - current_channel_id = None - for channel_id, channel in self._channels.items(): - if channel_id.from_worker_id == from_: - current_channel_id = channel_id + + current_channel_id = next( + ( + channel_id + for channel_id, channel in self._channels.items() + if channel_id.from_worker_id == from_ + ), + None, + ) if ( self._current_channel_id is None @@ -97,17 +110,30 @@ def process_data_payload( yield SenderChange(current_channel_id) if isinstance(payload, DataFrame): - for field_accessor in ArrowTableTupleProvider(payload.frame): - yield Tuple( - {name: field_accessor for name in payload.frame.column_names}, - schema=self._ports[ - self._channels[self._current_channel_id].port_id - ].get_schema(), - ) + yield from self._process_data(payload.frame) + elif isinstance(payload, MarkerFrame): + yield from self._process_marker(payload.frame) + else: + raise NotImplementedError() - elif isinstance(payload, MarkerFrame) and isinstance( - payload.frame, EndOfUpstream - ): + def _process_data(self, table: Table) -> Iterator[Tuple]: + schema = self._ports[ + self._channels[self._current_channel_id].port_id + ].get_schema() + for field_accessor in ArrowTableTupleProvider(table): + yield Tuple( + {name: field_accessor for name in table.column_names}, schema=schema + ) + + def _process_marker(self, marker: Marker) -> Iterator[InternalMarker]: + if isinstance(marker, State): + yield marker + if isinstance(marker, StartOfInputChannel): + if not self.started: + yield StartOfOutputPorts() + self.started = True + yield StartOfInputPort() + if isinstance(marker, EndOfInputChannel): channel = self._channels[self._current_channel_id] channel.complete() port_id = channel.port_id @@ -119,14 +145,11 @@ def process_data_payload( ) if port_completed: - yield InputExhausted() + yield EndOfInputPort() all_ports_completed = all( map(lambda port: port.is_completed(), self._ports.values()) ) if all_ports_completed: - yield EndOfAll() - - else: - raise NotImplementedError() + yield EndOfOutputPorts() diff --git a/core/amber/src/main/python/core/architecture/packaging/output_manager.py b/core/amber/src/main/python/core/architecture/packaging/output_manager.py index a3e9fb4f30a..e7592e0ab45 100644 --- a/core/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/core/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -2,27 +2,26 @@ from collections import OrderedDict from itertools import chain from loguru import logger -from typing import Iterable, Iterator - from pyarrow import Table +from typing import Iterable, Iterator from core.architecture.packaging.input_manager import WorkerPort, Channel +from core.architecture.sendsemantics.broad_cast_partitioner import ( + BroadcastPartitioner, +) from core.architecture.sendsemantics.hash_based_shuffle_partitioner import ( HashBasedShufflePartitioner, ) +from core.architecture.sendsemantics.one_to_one_partitioner import OneToOnePartitioner +from core.architecture.sendsemantics.partitioner import Partitioner from core.architecture.sendsemantics.range_based_shuffle_partitioner import ( RangeBasedShufflePartitioner, ) -from core.architecture.sendsemantics.one_to_one_partitioner import OneToOnePartitioner -from core.architecture.sendsemantics.partitioner import Partitioner from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.architecture.sendsemantics.broad_cast_partitioner import ( - BroadcastPartitioner, -) from core.models import Tuple, Schema, MarkerFrame -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.models.payload import DataPayload, DataFrame from core.util import get_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( @@ -99,19 +98,8 @@ def tuple_to_batch( ) ) - def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame: - return DataFrame( - frame=Table.from_pydict( - { - name: [t[name] for t in tuples] - for name in self.get_port().get_schema().get_attr_names() - }, - schema=self.get_port().get_schema().as_arrow_schema(), - ) - ) - - def emit_end_of_upstream( - self, + def emit_marker( + self, marker: Marker ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]: return chain( *( @@ -119,13 +107,24 @@ def emit_end_of_upstream( ( receiver, ( - MarkerFrame(tuples) - if isinstance(tuples, EndOfUpstream) - else self.tuple_to_frame(tuples) + MarkerFrame(payload) + if isinstance(payload, Marker) + else self.tuple_to_frame(payload) ), ) - for receiver, tuples in partitioner.no_more() + for receiver, payload in partitioner.flush(marker) ) for partitioner in self._partitioners.values() ) ) + + def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame: + return DataFrame( + frame=Table.from_pydict( + { + name: [t[name] for t in tuples] + for name in self.get_port().get_schema().get_attr_names() + }, + schema=self.get_port().get_schema().as_arrow_schema(), + ) + ) diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/broad_cast_partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/broad_cast_partitioner.py index 03afbdc1e99..407172975f0 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/broad_cast_partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/broad_cast_partitioner.py @@ -5,7 +5,7 @@ from core.architecture.sendsemantics.partitioner import Partitioner from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( Partitioning, @@ -34,12 +34,10 @@ def add_tuple_to_batch( self.reset() @overrides - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: if len(self.batch) > 0: for receiver in self.receivers: @@ -47,7 +45,7 @@ def no_more( self.reset() for receiver in self.receivers: - yield receiver, EndOfUpstream() + yield receiver, marker @overrides def reset(self) -> None: diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py index f33c21fa8b2..f4e0942768c 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py @@ -3,10 +3,9 @@ from loguru import logger from overrides import overrides - from core.architecture.sendsemantics.partitioner import Partitioner from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( HashBasedShufflePartitioning, @@ -43,14 +42,12 @@ def add_tuple_to_batch( self.receivers[hash_code] = (receiver, list()) @overrides - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: for receiver, batch in self.receivers: if len(batch) > 0: yield receiver, batch - yield receiver, EndOfUpstream() + yield receiver, marker diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/one_to_one_partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/one_to_one_partitioner.py index ed69841870f..1758363c0cb 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/one_to_one_partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/one_to_one_partitioner.py @@ -2,10 +2,9 @@ from typing import Iterator from overrides import overrides - from core.architecture.sendsemantics.partitioner import Partitioner from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( OneToOnePartitioning, @@ -34,17 +33,15 @@ def add_tuple_to_batch( self.reset() @overrides - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: if len(self.batch) > 0: yield self.receiver, self.batch self.reset() - yield self.receiver, EndOfUpstream() + yield self.receiver, marker @overrides def reset(self) -> None: diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/partitioner.py index 5c6007af222..e2ff2df34c7 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/partitioner.py @@ -5,7 +5,7 @@ from betterproto import Message from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import get_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import Partitioning from proto.edu.uci.ics.amber.engine.common import ActorVirtualIdentity @@ -20,12 +20,10 @@ def add_tuple_to_batch( ) -> Iterator[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]: pass - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: pass diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py index 66879868b12..31d0ccc6f87 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py @@ -6,7 +6,7 @@ from core.architecture.sendsemantics.partitioner import Partitioner from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( RangeBasedShufflePartitioning, @@ -56,14 +56,12 @@ def add_tuple_to_batch( self.receivers[receiver_index] = (receiver, list()) @overrides - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: for receiver, batch in self.receivers: if len(batch) > 0: yield receiver, batch - yield receiver, EndOfUpstream() + yield receiver, marker diff --git a/core/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py b/core/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py index 3ba8fa72664..47011051f4e 100644 --- a/core/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py +++ b/core/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py @@ -5,7 +5,7 @@ from core.architecture.sendsemantics.partitioner import Partitioner from core.models import Tuple -from core.models.marker import EndOfUpstream +from core.models.marker import Marker from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( Partitioning, @@ -36,14 +36,13 @@ def add_tuple_to_batch( self.round_robin_index = (self.round_robin_index + 1) % len(self.receivers) @overrides - def no_more( - self, + def flush( + self, marker: Marker ) -> Iterator[ - typing.Tuple[ - ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]] - ] + typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]] ]: for receiver, batch in self.receivers: if len(batch) > 0: yield receiver, batch - yield receiver, EndOfUpstream() + batch.clear() + yield receiver, marker diff --git a/core/amber/src/main/python/core/models/__init__.py b/core/amber/src/main/python/core/models/__init__.py index 0320003a47d..1d93f514105 100644 --- a/core/amber/src/main/python/core/models/__init__.py +++ b/core/amber/src/main/python/core/models/__init__.py @@ -2,11 +2,12 @@ from typing import NamedTuple from .internal_queue import InternalQueue -from .internal_marker import EndOfAll, InternalMarker, SenderChange, InputExhausted +from .internal_marker import InternalMarker, SenderChange from .tuple import Tuple, TupleLike, ArrowTableTupleProvider from .table import Table, TableLike from .batch import Batch, BatchLike from .schema import AttributeType, Field, Schema +from .marker import State from .operator import ( Operator, TableOperator, @@ -25,10 +26,8 @@ class ExceptionInfo(NamedTuple): __all__ = [ "InternalQueue", - "EndOfAll", "InternalMarker", "SenderChange", - "InputExhausted", "Tuple", "TupleLike", "ArrowTableTupleProvider", @@ -48,4 +47,5 @@ class ExceptionInfo(NamedTuple): "AttributeType", "Field", "Schema", + "State", ] diff --git a/core/amber/src/main/python/core/models/internal_marker.py b/core/amber/src/main/python/core/models/internal_marker.py index e10f7ce536e..78ed5c60513 100644 --- a/core/amber/src/main/python/core/models/internal_marker.py +++ b/core/amber/src/main/python/core/models/internal_marker.py @@ -1,11 +1,10 @@ from dataclasses import dataclass - - +from core.models.marker import Marker from proto.edu.uci.ics.amber.engine.common import ChannelIdentity @dataclass -class InternalMarker: +class InternalMarker(Marker): """ A special Data Message, only being generated in un-packaging a batch into Tuples. Markers retain the order information and served as a indicator of data state. @@ -20,10 +19,20 @@ class SenderChange(InternalMarker): @dataclass -class EndOfAll(InternalMarker): +class StartOfInputPort(InternalMarker): + pass + + +@dataclass +class EndOfInputPort(InternalMarker): + pass + + +@dataclass +class StartOfOutputPorts(InternalMarker): pass @dataclass -class InputExhausted(InternalMarker): +class EndOfOutputPorts(InternalMarker): pass diff --git a/core/amber/src/main/python/core/models/marker.py b/core/amber/src/main/python/core/models/marker.py index 759190d5ed2..b53e44e9322 100644 --- a/core/amber/src/main/python/core/models/marker.py +++ b/core/amber/src/main/python/core/models/marker.py @@ -1,4 +1,10 @@ from dataclasses import dataclass +from pandas import DataFrame +from pyarrow import Table +from typing import Optional + +from .schema import Schema, AttributeType +from .schema.attribute_type import FROM_PYOBJECT_MAPPING @dataclass @@ -7,5 +13,61 @@ class Marker: @dataclass -class EndOfUpstream(Marker): +class StartOfInputChannel(Marker): + pass + + +@dataclass +class EndOfInputChannel(Marker): pass + + +@dataclass +class State(Marker): + def __init__( + self, table: Optional[Table] = None, pass_to_all_downstream: bool = False + ): + self.schema = Schema() + self.passToAllDownstream = pass_to_all_downstream + if table is not None: + self.__dict__.update(table.to_pandas().iloc[0].to_dict()) + self.schema = Schema(table.schema) + + def add( + self, key: str, value: any, value_type: Optional[AttributeType] = None + ) -> None: + self.__dict__[key] = value + if value_type is not None: + self.schema.add(key, value_type) + elif key != "schema": + self.schema.add(key, FROM_PYOBJECT_MAPPING[type(value)]) + + def get(self, key: str) -> any: + return self.__dict__[key] + + def to_table(self) -> Table: + return Table.from_pandas( + df=DataFrame([self.__dict__]), + schema=self.schema.as_arrow_schema(), + ) + + def __setattr__(self, key: str, value: any) -> None: + self.add(key, value) + + def __setitem__(self, key: str, value: any) -> None: + self.add(key, value) + + def __getitem__(self, key: str) -> any: + return self.get(key) + + def __str__(self) -> str: + content = ", ".join( + [ + repr(key) + ": " + repr(value) + for key, value in self.__dict__.items() + if key != "schema" + ] + ) + return f"State[{content}]" + + __repr__ = __str__ diff --git a/core/amber/src/main/python/core/models/operator.py b/core/amber/src/main/python/core/models/operator.py index 5716454858b..2e420d37f8b 100644 --- a/core/amber/src/main/python/core/models/operator.py +++ b/core/amber/src/main/python/core/models/operator.py @@ -7,6 +7,7 @@ from . import Table, TableLike, Tuple, TupleLike, Batch, BatchLike +from .marker import State from .table import all_output_to_tuple @@ -46,6 +47,36 @@ def close(self) -> None: """ pass + def process_state(self, state: State, port: int) -> Optional[State]: + """ + Process an input State from the given link. + The default implementation is to pass the State to all downstream operators + if the State has pass_to_all_downstream set to True. + :param state: State, a State from an input port to be processed. + :param port: int, input port index of the current exhausted port. + :return: State, producing one State object + """ + if state.passToAllDownstream: + return state + + def produce_state_on_start(self, port: int) -> State: + """ + Produce a State when the given link started. + + :param port: int, input port index of the current initialized port. + :return: State, producing one State object + """ + pass + + def produce_state_on_finish(self, port: int) -> State: + """ + Produce a State after the input port is exhausted. + + :param port: int, input port index of the current exhausted port. + :return: State, producing one State object + """ + pass + class TupleOperatorV2(Operator): """ diff --git a/core/amber/src/main/python/core/models/schema/attribute_type.py b/core/amber/src/main/python/core/models/schema/attribute_type.py index ee7c8afebcc..e62508890d7 100644 --- a/core/amber/src/main/python/core/models/schema/attribute_type.py +++ b/core/amber/src/main/python/core/models/schema/attribute_type.py @@ -66,3 +66,12 @@ class AttributeType(Enum): AttributeType.BINARY: bytes, AttributeType.TIMESTAMP: datetime.datetime, } + +FROM_PYOBJECT_MAPPING = { + str: AttributeType.STRING, + int: AttributeType.INT, + float: AttributeType.DOUBLE, + bool: AttributeType.BOOL, + bytes: AttributeType.BINARY, + datetime.datetime: AttributeType.TIMESTAMP, +} diff --git a/core/amber/src/main/python/core/runnables/data_processor.py b/core/amber/src/main/python/core/runnables/data_processor.py index 758c75ead94..c322f0c7e24 100644 --- a/core/amber/src/main/python/core/runnables/data_processor.py +++ b/core/amber/src/main/python/core/runnables/data_processor.py @@ -4,9 +4,11 @@ from threading import Event from loguru import logger - +from typing import Iterator, Optional from core.architecture.managers import Context -from core.models import Tuple, ExceptionInfo +from core.models import ExceptionInfo, State, TupleLike +from core.models.internal_marker import StartOfInputPort, EndOfInputPort +from core.models.marker import Marker from core.models.table import all_output_to_tuple from core.util import Stoppable from core.util.console_message.replace_print import replace_print @@ -24,45 +26,69 @@ def __init__(self, context: Context): self._context = context def run(self) -> None: + """ + Start the data processing loop. Wait for context switch conditions to be met, + then continuously process markers or tuples until stopped. + """ with self._context.tuple_processing_manager.context_switch_condition: self._context.tuple_processing_manager.context_switch_condition.wait() self._running.set() self._switch_context() while self._running.is_set(): - self.process_tuple() + marker = self._context.marker_processing_manager.get_input_marker() + tuple_ = self._context.tuple_processing_manager.current_input_tuple + if marker is not None: + self.process_marker(marker) + elif tuple_ is not None: + self.process_tuple() + else: + raise RuntimeError("No marker or tuple to process.") + self._switch_context() + + def process_marker(self, marker: Marker) -> None: + """ + Process an input marker by invoking appropriate state + or tuple generation based on the marker type. + """ + try: + executor = self._context.executor_manager.executor + port_id = self._context.tuple_processing_manager.get_input_port_id() + with replace_print( + self._context.worker_id, + self._context.console_message_manager.print_buf, + ): + if isinstance(marker, StartOfInputPort): + self._set_output_state(executor.produce_state_on_start(port_id)) + elif isinstance(marker, State): + self._set_output_state(executor.process_state(marker, port_id)) + elif isinstance(marker, EndOfInputPort): + self._set_output_state(executor.produce_state_on_finish(port_id)) + self._set_output_tuple(executor.on_finish(port_id)) + + except Exception as err: + logger.exception(err) + exc_info = sys.exc_info() + self._context.exception_manager.set_exception_info(exc_info) + self._report_exception(exc_info) + + finally: self._switch_context() def process_tuple(self) -> None: + """ + Process an input tuple by invoking the executor's tuple processing method. + """ finished_current = self._context.tuple_processing_manager.finished_current while not finished_current.is_set(): try: executor = self._context.executor_manager.executor - tuple_ = self._context.tuple_processing_manager.current_input_tuple - port_id = self._context.tuple_processing_manager.current_input_port_id - port: int - if port_id is None: - # no upstream, special case for source executor. - port = 0 - else: - port = port_id.id - - output_iterator = ( - executor.process_tuple(tuple_, port) - if isinstance(tuple_, Tuple) - else executor.on_finish(port) - ) + port_id = self._context.tuple_processing_manager.get_input_port_id() + tuple_ = self._context.tuple_processing_manager.get_input_tuple() with replace_print( self._context.worker_id, self._context.console_message_manager.print_buf, ): - for output in output_iterator: - # output could be a None, a TupleLike, or a TableLike. - for output_tuple in all_output_to_tuple(output): - self._set_output_tuple(output_tuple) - self._switch_context() - - # current tuple finished successfully - finished_current.set() + self._set_output_tuple(executor.process_tuple(tuple_, port_id)) except Exception as err: logger.exception(err) @@ -73,10 +99,28 @@ def process_tuple(self) -> None: finally: self._switch_context() - def _set_output_tuple(self, output_tuple): - if output_tuple is not None: - output_tuple.finalize(self._context.output_manager.get_port().get_schema()) - self._context.tuple_processing_manager.current_output_tuple = output_tuple + def _set_output_tuple(self, output_iterator: Iterator[Optional[TupleLike]]) -> None: + """ + Set the output tuple after processing by the executor. + """ + for output in output_iterator: + # output could be a None, a TupleLike, or a TableLike. + for output_tuple in all_output_to_tuple(output): + if output_tuple is not None: + output_tuple.finalize( + self._context.output_manager.get_port().get_schema() + ) + self._context.tuple_processing_manager.current_output_tuple = ( + output_tuple + ) + self._switch_context() + self._context.tuple_processing_manager.finished_current.set() + + def _set_output_state(self, output_state: State) -> None: + """ + Set the output state after processing by the executor. + """ + self._context.marker_processing_manager.current_output_state = output_state def _switch_context(self) -> None: """ diff --git a/core/amber/src/main/python/core/runnables/main_loop.py b/core/amber/src/main/python/core/runnables/main_loop.py index 3e2cf02e337..fc8b17faad2 100644 --- a/core/amber/src/main/python/core/runnables/main_loop.py +++ b/core/amber/src/main/python/core/runnables/main_loop.py @@ -1,7 +1,7 @@ import threading import time import typing -from typing import Iterator, Optional, Union +from typing import Iterator, Optional from loguru import logger from overrides import overrides @@ -9,16 +9,21 @@ from core.architecture.managers.context import Context from core.architecture.managers.pause_manager import PauseType -from core.architecture.packaging.input_manager import EndOfAll +from core.architecture.packaging.input_manager import EndOfOutputPorts from core.architecture.rpc.async_rpc_client import AsyncRPCClient from core.architecture.rpc.async_rpc_server import AsyncRPCServer from core.models import ( - InputExhausted, InternalQueue, SenderChange, Tuple, ) +from core.models.internal_marker import ( + StartOfOutputPorts, + EndOfInputPort, + StartOfInputPort, +) from core.models.internal_queue import DataElement, ControlElement +from core.models.marker import State, EndOfInputChannel, StartOfInputChannel from core.runnables.data_processor import DataProcessor from core.util import StoppableQueueBlockingRunnable, get_one_of, set_one_of from core.util.customized_queue.queue_base import QueueElement @@ -149,8 +154,8 @@ def process_control_payload( def process_input_tuple(self) -> None: """ - Process the current input tuple with the current input link. Send all result - Tuples to downstream workers. + Process the current input tuple with the current input link. + Send all result Tuples or State to downstream workers. This is being invoked for each Tuple/Marker that are unpacked from the DataElement. @@ -171,6 +176,14 @@ def process_input_tuple(self) -> None: ): self._output_queue.put(DataElement(tag=to, payload=batch)) + def process_input_state(self) -> None: + self._switch_context() + output_state = self.context.marker_processing_manager.get_output_state() + self._switch_context() + if output_state is not None: + for to, batch in self.context.output_manager.emit_marker(output_state): + self._output_queue.put(DataElement(tag=to, payload=batch)) + def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]: """ Process the Tuple/InputExhausted with the current link. @@ -195,13 +208,28 @@ def _process_control_element(self, control_element: ControlElement) -> None: """ self.process_control_payload(control_element.tag, control_element.payload) - def _process_tuple(self, tuple_: Union[Tuple, InputExhausted]) -> None: + def _process_tuple(self, tuple_: Tuple) -> None: self.context.tuple_processing_manager.current_input_tuple = tuple_ self.process_input_tuple() self._check_and_process_control() - def _process_input_exhausted(self, input_exhausted: InputExhausted): - self._process_tuple(input_exhausted) + def _process_state(self, state_: State) -> None: + self.context.marker_processing_manager.current_input_marker = state_ + self.process_input_state() + self._check_and_process_control() + + def _process_start_of_input_port( + self, start_of_input_port: StartOfInputPort + ) -> None: + self.context.marker_processing_manager.current_input_marker = ( + start_of_input_port + ) + self.process_input_state() + + def _process_end_of_input_port(self, end_of_input_port: EndOfInputPort) -> None: + self.context.marker_processing_manager.current_input_marker = end_of_input_port + self.process_input_state() + self.process_input_tuple() if self.context.tuple_processing_manager.current_input_port_id is not None: control_command = set_one_of( ControlCommandV2, @@ -225,16 +253,28 @@ def _process_sender_change_marker(self, sender_change_marker: SenderChange) -> N self.context.input_manager.get_port_id(sender_change_marker.channel_id) ) - def _process_end_of_all_marker(self, _: EndOfAll) -> None: + def _process_start_of_output_ports(self, _: StartOfOutputPorts) -> None: + """ + Upon receipt of an StartOfAllMarker, + which indicates the start of any input links, + send the StartOfInputChannel to all downstream workers. + + :param _: StartOfAny Internal Marker + """ + for to, batch in self.context.output_manager.emit_marker(StartOfInputChannel()): + self._output_queue.put(DataElement(tag=to, payload=batch)) + self._check_and_process_control() + + def _process_end_of_output_ports(self, _: EndOfOutputPorts) -> None: """ Upon receipt of an EndOfAllMarker, which indicates the end of all input links, send the last data batches to all downstream workers. It will also invoke complete() of this DataProcessor. - :param _: EndOfAllMarker + :param _: EndOfOutputPorts """ - for to, batch in self.context.output_manager.emit_end_of_upstream(): + for to, batch in self.context.output_manager.emit_marker(EndOfInputChannel()): self._output_queue.put(DataElement(tag=to, payload=batch)) self._check_and_process_control() control_command = set_one_of( @@ -265,7 +305,7 @@ def _process_data_element(self, data_element: DataElement) -> None: if self.context.tuple_processing_manager.current_input_tuple_iter is None: return - # here the self.context.processing_manager.current_input_tuple_iter + # here the self.context.processing_manager.current_input_iter # could be modified during iteration, thus we are using the while := # way to iterate through the iterator, instead of the for-each-loop # syntax sugar. @@ -279,12 +319,18 @@ def _process_data_element(self, data_element: DataElement) -> None: element, Tuple, self._process_tuple, - InputExhausted, - self._process_input_exhausted, + StartOfInputPort, + self._process_start_of_input_port, + EndOfInputPort, + self._process_end_of_input_port, SenderChange, self._process_sender_change_marker, - EndOfAll, - self._process_end_of_all_marker, + StartOfOutputPorts, + self._process_start_of_output_ports, + EndOfOutputPorts, + self._process_end_of_output_ports, + State, + self._process_state, ) except Exception as err: logger.exception(err) diff --git a/core/amber/src/main/python/core/runnables/network_receiver.py b/core/amber/src/main/python/core/runnables/network_receiver.py index b66af3bc9ef..22d0911b9f7 100644 --- a/core/amber/src/main/python/core/runnables/network_receiver.py +++ b/core/amber/src/main/python/core/runnables/network_receiver.py @@ -2,6 +2,7 @@ from overrides import overrides from pyarrow.lib import Table from typing import Optional +from pampy import match from core.architecture.handlers.actorcommand.actor_handler_base import ( ActorCommandHandler, @@ -17,7 +18,7 @@ MarkerFrame, ) from core.models.internal_queue import DataElement, ControlElement, InternalQueue -from core.models.marker import EndOfUpstream +from core.models.marker import EndOfInputChannel, State, StartOfInputChannel from core.proxy import ProxyServer from core.util import Stoppable, get_one_of from core.util.runnable.runnable import Runnable @@ -63,18 +64,19 @@ def data_handler(command: bytes, table: Table) -> int: :return: sender credits """ data_header = PythonDataHeader().parse(command) - payload_type = data_header.payload_type - if payload_type == "data": - payload = DataFrame(table) - elif payload_type == "EndOfUpstream": - payload = MarkerFrame(EndOfUpstream()) - shared_queue.put( - DataElement( - tag=data_header.tag, - payload=payload, - ) + payload = match( + data_header.payload_type, + "Data", + lambda _: DataFrame(table), + "State", + lambda _: MarkerFrame(State(table)), + "StartOfInputChannel", + MarkerFrame(StartOfInputChannel()), + "EndOfInputChannel", + MarkerFrame(EndOfInputChannel()), ) + shared_queue.put(DataElement(tag=data_header.tag, payload=payload)) return shared_queue.in_mem_size() self._proxy_server.register_data_handler(data_handler) diff --git a/core/amber/src/main/python/core/runnables/network_sender.py b/core/amber/src/main/python/core/runnables/network_sender.py index 11b3816a23f..031f2783902 100644 --- a/core/amber/src/main/python/core/runnables/network_sender.py +++ b/core/amber/src/main/python/core/runnables/network_sender.py @@ -3,7 +3,8 @@ from loguru import logger from overrides import overrides -from core.models import DataPayload, InternalQueue, DataFrame, MarkerFrame +from core.models import DataPayload, InternalQueue, DataFrame, MarkerFrame, State + from core.models.internal_queue import InternalQueueElement, DataElement, ControlElement from core.proxy import ProxyClient from core.util import StoppableQueueBlockingRunnable @@ -49,20 +50,25 @@ def _send_data(self, to: ActorVirtualIdentity, data_payload: DataPayload) -> Non :param to: The target actor's ActorVirtualIdentity :param data_payload: The data payload to be sent, can be either DataFrame or - EndOfUpstream + EndOfInputChannel """ if isinstance(data_payload, DataFrame): - data_header = PythonDataHeader(tag=to, payload_type="data") - self._proxy_client.send_data( - bytes(data_header), data_payload.frame - ) # returns credits + data_header = PythonDataHeader(tag=to, payload_type="Data") + self._proxy_client.send_data(bytes(data_header), data_payload.frame) elif isinstance(data_payload, MarkerFrame): data_header = PythonDataHeader( tag=to, payload_type=data_payload.frame.__class__.__name__ ) - self._proxy_client.send_data(bytes(data_header), None) # returns credits + table = ( + data_payload.frame.to_table() + if isinstance(data_payload.frame, State) + else None + ) + self._proxy_client.send_data(bytes(data_header), table) + else: + raise TypeError(f"Unexpected payload {data_payload}") @logger.catch(reraise=True) def _send_control( diff --git a/core/amber/src/main/python/core/runnables/test_main_loop.py b/core/amber/src/main/python/core/runnables/test_main_loop.py index 57d73058233..1623205c689 100644 --- a/core/amber/src/main/python/core/runnables/test_main_loop.py +++ b/core/amber/src/main/python/core/runnables/test_main_loop.py @@ -13,7 +13,7 @@ Tuple, ) from core.models.internal_queue import DataElement, ControlElement -from core.models.marker import EndOfUpstream +from core.models.marker import EndOfInputChannel from core.runnables import MainLoop from core.util import set_one_of from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( @@ -135,7 +135,9 @@ def mock_batch_data_elements(self, mock_batch, mock_sender_actor): @pytest.fixture def mock_end_of_upstream(self, mock_tuple, mock_sender_actor): - return DataElement(tag=mock_sender_actor, payload=MarkerFrame(EndOfUpstream())) + return DataElement( + tag=mock_sender_actor, payload=MarkerFrame(EndOfInputChannel()) + ) @pytest.fixture def input_queue(self): @@ -533,7 +535,7 @@ def test_main_loop_thread_can_process_messages( ), ) - # can process EndOfUpstream + # can process EndOfInputChannel input_queue.put(mock_end_of_upstream) # the input port should complete @@ -580,7 +582,7 @@ def test_main_loop_thread_can_process_messages( ) assert output_queue.get() == DataElement( - tag=mock_receiver_actor, payload=MarkerFrame(EndOfUpstream()) + tag=mock_receiver_actor, payload=MarkerFrame(EndOfInputChannel()) ) # can process ReturnInvocation diff --git a/core/amber/src/main/python/core/runnables/test_network_receiver.py b/core/amber/src/main/python/core/runnables/test_network_receiver.py index 5728e35f8d5..ec682237935 100644 --- a/core/amber/src/main/python/core/runnables/test_network_receiver.py +++ b/core/amber/src/main/python/core/runnables/test_network_receiver.py @@ -4,7 +4,7 @@ from pyarrow import Table from core.models.internal_queue import InternalQueue, ControlElement, DataElement -from core.models.marker import EndOfUpstream +from core.models.marker import EndOfInputChannel from core.models.payload import MarkerFrame, DataFrame from core.proxy import ProxyClient from core.runnables.network_receiver import NetworkReceiver @@ -121,11 +121,11 @@ def test_network_receiver_can_receive_data_messages_end_of_upstream( network_sender_thread.start() worker_id = ActorVirtualIdentity(name="test") input_queue.put( - DataElement(tag=worker_id, payload=MarkerFrame(EndOfUpstream())) + DataElement(tag=worker_id, payload=MarkerFrame(EndOfInputChannel())) ) element: DataElement = output_queue.get() assert isinstance(element.payload, MarkerFrame) - assert element.payload.frame == EndOfUpstream() + assert element.payload.frame == EndOfInputChannel() assert element.tag == worker_id @pytest.mark.timeout(2) diff --git a/core/amber/src/main/python/pyamber/__init__.py b/core/amber/src/main/python/pyamber/__init__.py index 0b68aa53e96..c170e8f1ba3 100644 --- a/core/amber/src/main/python/pyamber/__init__.py +++ b/core/amber/src/main/python/pyamber/__init__.py @@ -1,5 +1,4 @@ from core.models import ( - InputExhausted, Tuple, TupleLike, Table, @@ -10,10 +9,10 @@ BatchOperator, SourceOperator, TupleOperatorV2, + State, ) __all__ = [ - "InputExhausted", "Tuple", "TupleLike", "Table", @@ -24,4 +23,5 @@ "BatchOperator", "TupleOperatorV2", "SourceOperator", + "State", ] diff --git a/core/amber/src/main/python/pytexera/__init__.py b/core/amber/src/main/python/pytexera/__init__.py index 22dc37aa7fa..28ed737411c 100644 --- a/core/amber/src/main/python/pytexera/__init__.py +++ b/core/amber/src/main/python/pytexera/__init__.py @@ -11,7 +11,7 @@ ) __all__ = [ - "InputExhausted", + "State", "Tuple", "TupleLike", "UDFOperatorV2", diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala index e4816ffaf97..a486a377430 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -17,6 +17,7 @@ import edu.uci.ics.amber.engine.common.ambermessage.InvocationConvertUtils.{ import edu.uci.ics.amber.engine.common.ambermessage.{PythonControlMessage, _} import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient.{ControlInvocation, ReturnInvocation} import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.texera.workflow.common.State import edu.uci.ics.texera.workflow.common.tuple.Tuple import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import org.apache.arrow.flight._ @@ -101,10 +102,13 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu def sendData(dataPayload: DataPayload, from: ActorVirtualIdentity): Unit = { dataPayload match { - case DataFrame(frame) => - writeArrowStream(mutable.Queue(frame: _*), from, "data") + case DataFrame(frame) => writeArrowStream(mutable.Queue(frame: _*), from, "Data") case MarkerFrame(marker) => - writeArrowStream(mutable.Queue.empty, from, marker.getClass.getSimpleName) + marker match { + case state: State => + writeArrowStream(mutable.Queue(state.toTuple), from, marker.getClass.getSimpleName) + case _ => writeArrowStream(mutable.Queue.empty, from, marker.getClass.getSimpleName) + } } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyServer.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyServer.scala index b5d62fb84e8..6ac9291283d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyServer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyServer.scala @@ -20,7 +20,7 @@ import java.net.ServerSocket import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable import com.twitter.util.Promise -import edu.uci.ics.texera.workflow.common.EndOfUpstream +import edu.uci.ics.texera.workflow.common.{EndOfInputChannel, StartOfInputChannel, State} import java.nio.charset.Charset @@ -103,20 +103,23 @@ private class AmberProducer( // closing the stream will release the dictionaries flightStream.takeDictionaryOwnership - if (dataHeader.payloadType == EndOfUpstream().getClass.getSimpleName) { - assert(root.getRowCount == 0) - outputPort.sendTo(to, MarkerFrame(EndOfUpstream())) - } else { - // normal data batches - val queue = mutable.Queue[Tuple]() - for (i <- 0 until root.getRowCount) - queue.enqueue(ArrowUtils.getTexeraTuple(i, root)) - outputPort.sendTo(to, DataFrame(queue.toArray)) - + dataHeader.payloadType match { + case "StartOfInputChannel" => + assert(root.getRowCount == 0) + outputPort.sendTo(to, MarkerFrame(StartOfInputChannel())) + case "EndOfInputChannel" => + assert(root.getRowCount == 0) + outputPort.sendTo(to, MarkerFrame(EndOfInputChannel())) + case "State" => + assert(root.getRowCount == 1) + outputPort.sendTo(to, MarkerFrame(State(Some(ArrowUtils.getTexeraTuple(0, root))))) + case _ => // normal data batches + val queue = mutable.Queue[Tuple]() + for (i <- 0 until root.getRowCount) + queue.enqueue(ArrowUtils.getTexeraTuple(i, root)) + outputPort.sendTo(to, DataFrame(queue.toArray)) } - } - } class PythonProxyServer( diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index a61bfe11992..92c134480e2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -33,7 +33,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF} import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.amber.error.ErrorUtils.{mkConsoleMessage, safely} -import edu.uci.ics.texera.workflow.common.EndOfUpstream +import edu.uci.ics.texera.workflow.common.{EndOfInputChannel, StartOfInputChannel, State} import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor import edu.uci.ics.texera.workflow.common.tuple.Tuple @@ -93,12 +93,45 @@ class DataProcessor( } } + private[this] def processInputState(state: State, port: Int): Unit = { + try { + val outputState = executor.processState(state, port) + if (outputState.isDefined) { + outputManager.emitMarker(outputState.get) + } + } catch safely { + case e => + handleExecutorException(e) + } + } + + /** + * process start of an input port with Executor.produceStateOnStart(). + * this function is only called by the DP thread. + */ + private[this] def processStartOfInputChannel(portId: Int): Unit = { + try { + outputManager.emitMarker(StartOfInputChannel()) + val outputState = executor.produceStateOnStart(portId) + if (outputState.isDefined) { + outputManager.emitMarker(outputState.get) + } + } catch safely { + case e => + handleExecutorException(e) + } + } + /** - * process end of an input port with Executor.onFinish(). + * process end of an input port with Executor.produceStateOnFinish(). * this function is only called by the DP thread. */ - private[this] def processEndOfUpstream(portId: Int): Unit = { + private[this] def processEndOfInputChannel(portId: Int): Unit = { try { + val outputState = executor.produceStateOnFinish(portId) + if (outputState.isDefined) { + outputManager.emitMarker(outputState.get) + } outputManager.outputIterator.setTupleOutput( executor.onFinishMultiPort(portId) ) @@ -134,7 +167,7 @@ class DataProcessor( outputTuple match { case FinalizeExecutor() => - outputManager.emitMarker(EndOfUpstream()) + outputManager.emitMarker(EndOfInputChannel()) // Send Completed signal to worker actor. executor.close() adaptiveBatchingMonitor.stopAdaptiveBatching() @@ -191,11 +224,15 @@ class DataProcessor( processInputTuple(inputManager.getNextTuple) case MarkerFrame(marker) => marker match { - case EndOfUpstream() => + case state: State => + processInputState(state, portId.id) + case StartOfInputChannel() => + processStartOfInputChannel(portId.id) + case EndOfInputChannel() => this.inputManager.getPort(portId).channels(channelId) = true if (inputManager.isPortCompleted(portId)) { inputManager.initBatch(channelId, Array.empty) - processEndOfUpstream(portId.id) + processEndOfInputChannel(portId.id) outputManager.outputIterator.appendSpecialTupleToEnd( FinalizePort(portId, input = true) ) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartHandler.scala index 9a5298040ee..431705c7c69 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartHandler.scala @@ -11,7 +11,7 @@ import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand import edu.uci.ics.amber.engine.common.virtualidentity.ChannelIdentity import edu.uci.ics.amber.engine.common.virtualidentity.util.SOURCE_STARTER_ACTOR import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import edu.uci.ics.texera.workflow.common.EndOfUpstream +import edu.uci.ics.texera.workflow.common.{EndOfInputChannel, StartOfInputChannel} object StartHandler { final case class StartWorker() extends ControlCommand[WorkerState] @@ -33,7 +33,11 @@ trait StartHandler { .setPortId(dummyInputPortId) dp.processDataPayload( ChannelIdentity(SOURCE_STARTER_ACTOR, dp.actorId, isControl = false), - MarkerFrame(EndOfUpstream()) + MarkerFrame(StartOfInputChannel()) + ) + dp.processDataPayload( + ChannelIdentity(SOURCE_STARTER_ACTOR, dp.actorId, isControl = false), + MarkerFrame(EndOfInputChannel()) ) dp.stateManager.getCurrentState } else { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/Marker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/Marker.scala index 10fe4220bf2..41ef2fa6707 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/Marker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/Marker.scala @@ -1,5 +1,47 @@ package edu.uci.ics.texera.workflow.common +import edu.uci.ics.texera.workflow.common.tuple.Tuple +import edu.uci.ics.texera.workflow.common.tuple.schema.{AttributeType, Schema, Attribute} +import scala.collection.mutable + sealed trait Marker -final case class EndOfUpstream() extends Marker +final case class StartOfInputChannel() extends Marker +final case class EndOfInputChannel() extends Marker + +final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean = false) + extends Marker { + val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap() + add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN) + if (tuple.isDefined) { + tuple.get.getSchema.getAttributes.foreach { attribute => + add(attribute.getName, tuple.get.getField(attribute.getName), attribute.getType) + } + } + + def add(key: String, value: Any, valueType: AttributeType): Unit = + data.put(key, (valueType, value)) + + def get(key: String): Any = data(key)._2 + + def isPassToAllDownstream: Boolean = get("passToAllDownstream").asInstanceOf[Boolean] + + def apply(key: String): Any = get(key) + + def toTuple: Tuple = + Tuple + .builder( + Schema + .builder() + .add(data.map { + case (name, (attrType, _)) => + new Attribute(name, attrType) + }) + .build() + ) + .addSequentially(data.values.map(_._2).toArray) + .build() + + override def toString: String = + data.map { case (key, (_, value)) => s"$key: $value" }.mkString(", ") +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorExecutor.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorExecutor.scala index f27b263f34a..327e3a1adb9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorExecutor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorExecutor.scala @@ -2,12 +2,23 @@ package edu.uci.ics.texera.workflow.common.operators import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike import edu.uci.ics.amber.engine.common.workflow.PortIdentity +import edu.uci.ics.texera.workflow.common.State import edu.uci.ics.texera.workflow.common.tuple.Tuple trait OperatorExecutor { def open(): Unit = {} + def produceStateOnStart(port: Int): Option[State] = None + + def processState(state: State, port: Int): Option[State] = { + if (state.isPassToAllDownstream) { + Some(state) + } else { + None + } + } + def processTupleMultiPort( tuple: Tuple, port: Int @@ -17,6 +28,8 @@ trait OperatorExecutor { def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] + def produceStateOnFinish(port: Int): Option[State] = None + def onFinishMultiPort(port: Int): Iterator[(TupleLike, Option[PortIdentity])] = { onFinish(port).map(t => (t, None)) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala index 0ff4b539319..5cfdfbf71d3 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala @@ -11,7 +11,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{ PhysicalOpIdentity } import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} -import edu.uci.ics.texera.workflow.common.EndOfUpstream +import edu.uci.ics.texera.workflow.common.EndOfInputChannel import edu.uci.ics.texera.workflow.common.tuple.schema.{AttributeType, Schema} import org.scalamock.scalatest.MockFactory import org.scalatest.flatspec.AnyFlatSpec @@ -67,7 +67,7 @@ class OutputManagerSpec extends AnyFlatSpec with MockFactory { mkDataMessage(fakeID, identifier, 2, DataFrame(tuples.slice(20, 21))) ) (mockHandler.apply _).expects( - mkDataMessage(fakeID, identifier, 3, MarkerFrame(EndOfUpstream())) + mkDataMessage(fakeID, identifier, 3, MarkerFrame(EndOfInputChannel())) ) } val fakeLink = PhysicalLink(physicalOpId(), mockPortId, physicalOpId(), mockPortId) @@ -81,7 +81,7 @@ class OutputManagerSpec extends AnyFlatSpec with MockFactory { tuples.foreach { t => outputManager.passTupleToDownstream(TupleLike(t.getFields), None) } - outputManager.emitMarker(EndOfUpstream()) + outputManager.emitMarker(EndOfInputChannel()) } } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala index bbbc30308ce..3205a3397be 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala @@ -19,7 +19,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{ PhysicalOpIdentity } import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import edu.uci.ics.texera.workflow.common.EndOfUpstream +import edu.uci.ics.texera.workflow.common.EndOfInputChannel import edu.uci.ics.texera.workflow.common.WorkflowContext.DEFAULT_WORKFLOW_ID import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor import edu.uci.ics.texera.workflow.common.tuple.Tuple @@ -78,6 +78,13 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter ) .expects(x, 0) } + ( + ( + input: Int + ) => executor.produceStateOnFinish(input) + ) + .expects(0) + .returning(None) ( ( input: Int @@ -108,7 +115,7 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter } dp.processDataPayload( ChannelIdentity(senderWorkerId, testWorkerId, isControl = false), - MarkerFrame(EndOfUpstream()) + MarkerFrame(EndOfInputChannel()) ) while (dp.inputManager.hasUnfinishedInput || dp.outputManager.hasUnfinishedOutput) { dp.continueDataProcessing() @@ -130,6 +137,13 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter ) .expects(x, 0) } + ( + ( + input: Int + ) => executor.produceStateOnFinish(input) + ) + .expects(0) + .returning(None) ( ( input: Int @@ -162,7 +176,7 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter (executor.close _).expects().once() dp.processDataPayload( ChannelIdentity(senderWorkerId, testWorkerId, isControl = false), - MarkerFrame(EndOfUpstream()) + MarkerFrame(EndOfInputChannel()) ) while (dp.inputManager.hasUnfinishedInput || dp.outputManager.hasUnfinishedOutput) { dp.continueDataProcessing()