From e669fe4e99af01ee0eea1203dd64d462d774e266 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 17 Jun 2025 15:34:39 -0700 Subject: [PATCH 1/8] feat: add async source transformer Signed-off-by: Sidhant Kohli --- pynumaflow/sourcetransformer/_dtypes.py | 7 + pynumaflow/sourcetransformer/async_server.py | 157 +++++++++++ tests/sourcetransform/test_async.py | 273 +++++++++++++++++++ 3 files changed, 437 insertions(+) create mode 100644 pynumaflow/sourcetransformer/async_server.py create mode 100644 tests/sourcetransform/test_async.py diff --git a/pynumaflow/sourcetransformer/_dtypes.py b/pynumaflow/sourcetransformer/_dtypes.py index 13b2bce1..bc0ec7b5 100644 --- a/pynumaflow/sourcetransformer/_dtypes.py +++ b/pynumaflow/sourcetransformer/_dtypes.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime from typing import TypeVar, Callable, Union, Optional +from collections.abc import Awaitable from warnings import warn from pynumaflow._constants import DROP @@ -210,3 +211,9 @@ def handler(self, keys: list[str], datum: Datum) -> Messages: # SourceTransformCallable is the type of the handler function for the # Source Transformer UDFunction. SourceTransformCallable = Union[SourceTransformHandler, SourceTransformer] + + +# SourceTransformAsyncCallable is a callable which can be used as a handler +# for the Asynchronous Transformer UDF +SourceTransformHandlerAsyncHandlerCallable = Callable[[list[str], Datum], Awaitable[Messages]] +SourceTransformAsyncCallable = Union[SourceTransformer, SourceTransformHandlerAsyncHandlerCallable] diff --git a/pynumaflow/sourcetransformer/async_server.py b/pynumaflow/sourcetransformer/async_server.py new file mode 100644 index 00000000..0dc8add9 --- /dev/null +++ b/pynumaflow/sourcetransformer/async_server.py @@ -0,0 +1,157 @@ +import aiorun +import grpc + +from pynumaflow._constants import ( + NUM_THREADS_DEFAULT, + MAX_MESSAGE_SIZE, + MAX_NUM_THREADS, + SOURCE_TRANSFORMER_SOCK_PATH, + SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH, +) +from pynumaflow.info.types import ( + ServerInfo, + MINIMUM_NUMAFLOW_VERSION, + ContainerType, +) +from pynumaflow.proto.sourcetransformer import transform_pb2_grpc +from pynumaflow.shared.server import ( + NumaflowServer, + start_async_server, +) +from pynumaflow.sourcetransformer._dtypes import SourceTransformAsyncCallable +from pynumaflow.sourcetransformer.servicer._async_servicer import SourceTransformAsyncServicer + + +class SourceTransformAsyncServer(NumaflowServer): + """ + Create a new grpc Source Transformer Server instance. + A new servicer instance is created and attached to the server. + The server instance is returned. + Args: + source_transform_instance: The source transformer instance to be used for + Source Transformer UDF + sock_path: The UNIX socket path to be used for the server + max_message_size: The max message size in bytes the server can receive and send + max_threads: The max number of threads to be spawned; + defaults to 4 and max capped at 16 + + Example Invocation: + + import datetime + import logging + + from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer + # This is a simple User Defined Function example which receives a message, + # applies the following + # data transformation, and returns the message. + # If the message event time is before year 2022, drop the message with event time unchanged. + # If it's within year 2022, update the tag to "within_year_2022" and + # update the message event time to Jan 1st 2022. + # Otherwise, (exclusively after year 2022), update the tag to + # "after_year_2022" and update the + # message event time to Jan 1st 2023. + + january_first_2022 = datetime.datetime.fromtimestamp(1640995200) + january_first_2023 = datetime.datetime.fromtimestamp(1672531200) + + + async def my_handler(keys: list[str], datum: Datum) -> Messages: + val = datum.value + event_time = datum.event_time + messages = Messages() + + if event_time < january_first_2022: + logging.info("Got event time:%s, it is before 2022, so dropping", event_time) + messages.append(Message.to_drop(event_time)) + elif event_time < january_first_2023: + logging.info( + "Got event time:%s, it is within year 2022, so forwarding to within_year_2022", + event_time, + ) + messages.append( + Message(value=val, event_time=january_first_2022, + tags=["within_year_2022"]) + ) + else: + logging.info( + "Got event time:%s, it is after year 2022, so forwarding to + after_year_2022", event_time + ) + messages.append(Message(value=val, event_time=january_first_2023, + tags=["after_year_2022"])) + + return messages + + + if __name__ == "__main__": + grpc_server = SourceTransformAsyncServer(my_handler) + grpc_server.start() + """ + + def __init__( + self, + source_transform_instance: SourceTransformAsyncCallable, + sock_path=SOURCE_TRANSFORMER_SOCK_PATH, + max_message_size=MAX_MESSAGE_SIZE, + max_threads=NUM_THREADS_DEFAULT, + server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH, + ): + """ + Create a new grpc Asynchronous Map Server instance. + A new servicer instance is created and attached to the server. + The server instance is returned. + Args: + mapper_instance: The mapper instance to be used for Map UDF + sock_path: The UNIX socket path to be used for the server + max_message_size: The max message size in bytes the server can receive and send + max_threads: The max number of threads to be spawned; + defaults to 4 and max capped at 16 + """ + self.sock_path = f"unix://{sock_path}" + self.max_threads = min(max_threads, MAX_NUM_THREADS) + self.max_message_size = max_message_size + self.server_info_file = server_info_file + + self.source_transform_instance = source_transform_instance + + self._server_options = [ + ("grpc.max_send_message_length", self.max_message_size), + ("grpc.max_receive_message_length", self.max_message_size), + ] + self.servicer = SourceTransformAsyncServicer(handler=source_transform_instance) + + def start(self) -> None: + """ + Starter function for the Async server class, need a separate caller + so that all the async coroutines can be started from a single context + """ + aiorun.run(self.aexec(), use_uvloop=True) + + async def aexec(self) -> None: + """ + Starts the Async gRPC server on the given UNIX socket with + given max threads. + """ + + # As the server is async, we need to create a new server instance in the + # same thread as the event loop so that all the async calls are made in the + # same context + + server_new = grpc.aio.server(options=self._server_options) + server_new.add_insecure_port(self.sock_path) + transform_pb2_grpc.add_SourceTransformServicer_to_server(self.servicer, server_new) + + serv_info = ServerInfo.get_default_server_info() + serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ + ContainerType.Sourcetransformer + ] + + # Start the async server + await start_async_server( + server_async=server_new, + sock_path=self.sock_path, + max_threads=self.max_threads, + cleanup_coroutines=list(), + server_info_file=self.server_info_file, + server_info=serv_info, + ) diff --git a/tests/sourcetransform/test_async.py b/tests/sourcetransform/test_async.py new file mode 100644 index 00000000..fd571904 --- /dev/null +++ b/tests/sourcetransform/test_async.py @@ -0,0 +1,273 @@ +import asyncio +import logging +import threading +import unittest +from unittest.mock import patch +from google.protobuf import timestamp_pb2 as _timestamp_pb2 + +import grpc +from google.protobuf import empty_pb2 as _empty_pb2 +from grpc.aio._server import Server + +from pynumaflow import setup_logging +from pynumaflow._constants import MAX_MESSAGE_SIZE +from pynumaflow.proto.sourcetransformer import transform_pb2_grpc +from pynumaflow.sourcetransformer import Datum, Messages, Message +from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer +from tests.sourcetransform.utils import get_test_datums +from tests.testing_utils import ( + mock_terminate_on_stop, + mock_new_event_time, +) + +LOGGER = setup_logging(__name__) + +# if set to true, transform handler will raise a `ValueError` exception. +raise_error_from_st = False + + +async def async_transform_handler(keys: list[str], datum: Datum) -> Messages: + if raise_error_from_st: + raise ValueError("Exception thrown from transform") + val = datum.value + msg = "payload:{} event_time:{} ".format( + val.decode("utf-8"), + datum.event_time, + ) + val = bytes(msg, encoding="utf-8") + messages = Messages() + messages.append(Message(val, mock_new_event_time(), keys=keys)) + return messages + + +def request_generator(req): + yield from req + + +_s: Server = None +_channel = grpc.insecure_channel("unix:///tmp/async_st.sock") +_loop = None + + +def startup_callable(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + +def new_async_st(): + server = SourceTransformAsyncServer(source_transform_instance=async_transform_handler) + udfs = server.servicer + return udfs + + +async def start_server(udfs): + _server_options = [ + ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), + ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), + ] + server = grpc.aio.server(options=_server_options) + transform_pb2_grpc.add_SourceTransformServicer_to_server(udfs, server) + listen_addr = "unix:///tmp/async_st.sock" + server.add_insecure_port(listen_addr) + logging.info("Starting server on %s", listen_addr) + global _s + _s = server + await server.start() + await server.wait_for_termination() + + +# We are mocking the terminate function from the psutil to not exit the program during testing +@patch("psutil.Process.kill", mock_terminate_on_stop) +class TestAsyncTransformer(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + global _loop + loop = asyncio.new_event_loop() + _loop = loop + _thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True) + _thread.start() + udfs = new_async_st() + asyncio.run_coroutine_threadsafe(start_server(udfs), loop=loop) + while True: + try: + with grpc.insecure_channel("unix:///tmp/async_st.sock") as channel: + f = grpc.channel_ready_future(channel) + f.result(timeout=10) + if f.done(): + break + except grpc.FutureTimeoutError as e: + LOGGER.error("error trying to connect to grpc server") + LOGGER.error(e) + + @classmethod + def tearDownClass(cls) -> None: + try: + _loop.stop() + LOGGER.info("stopped the event loop") + except Exception as e: + LOGGER.error(e) + + def test_run_server(self) -> None: + with grpc.insecure_channel("unix:///tmp/async_st.sock") as channel: + stub = transform_pb2_grpc.SourceTransformStub(channel) + request = get_test_datums() + generator_response = None + try: + generator_response = stub.SourceTransformFn( + request_iterator=request_generator(request) + ) + except grpc.RpcError as e: + logging.error(e) + + responses = [] + # capture the output from the ReadFn generator and assert. + for r in generator_response: + responses.append(r) + + # 1 handshake + 3 data responses + self.assertEqual(4, len(responses)) + + self.assertTrue(responses[0].handshake.sot) + + idx = 1 + while idx < len(responses): + _id = "test-id-" + str(idx) + self.assertEqual(_id, responses[idx].id) + self.assertEqual( + bytes( + "payload:test_mock_message " "event_time:2022-09-12 16:00:00 ", + encoding="utf-8", + ), + responses[idx].results[0].value, + ) + self.assertEqual(1, len(responses[idx].results)) + idx += 1 + + LOGGER.info("Successfully validated the server") + + def test_async_source_transformer(self) -> None: + stub = transform_pb2_grpc.SourceTransformStub(_channel) + request = get_test_datums() + generator_response = None + try: + generator_response = stub.SourceTransformFn(request_iterator=request_generator(request)) + except grpc.RpcError as e: + logging.error(e) + + responses = [] + # capture the output from the ReadFn generator and assert. + for r in generator_response: + responses.append(r) + + # 1 handshake + 3 data responses + self.assertEqual(4, len(responses)) + + self.assertTrue(responses[0].handshake.sot) + + idx = 1 + while idx < len(responses): + _id = "test-id-" + str(idx) + self.assertEqual(_id, responses[idx].id) + self.assertEqual( + bytes( + "payload:test_mock_message " "event_time:2022-09-12 16:00:00 ", + encoding="utf-8", + ), + responses[idx].results[0].value, + ) + self.assertEqual(1, len(responses[idx].results)) + idx += 1 + + # Verify new event time gets assigned. + updated_event_time_timestamp = _timestamp_pb2.Timestamp() + updated_event_time_timestamp.FromDatetime(dt=mock_new_event_time()) + self.assertEqual( + updated_event_time_timestamp, + responses[1].results[0].event_time, + ) + # self.assertEqual(code, grpc.StatusCode.OK) + + def test_async_source_transformer_grpc_error_no_handshake(self) -> None: + stub = transform_pb2_grpc.SourceTransformStub(_channel) + request = get_test_datums(handshake=False) + grpc_exception = None + + responses = [] + try: + generator_response = stub.SourceTransformFn(request_iterator=request_generator(request)) + # capture the output from the ReadFn generator and assert. + for r in generator_response: + responses.append(r) + except grpc.RpcError as e: + logging.error(e) + grpc_exception = e + self.assertTrue("SourceTransformFn: expected handshake message" in e.__str__()) + + self.assertEqual(0, len(responses)) + self.assertIsNotNone(grpc_exception) + + def test_async_source_transformer_grpc_error(self) -> None: + stub = transform_pb2_grpc.SourceTransformStub(_channel) + request = get_test_datums() + grpc_exception = None + + responses = [] + try: + global raise_error_from_st + raise_error_from_st = True + generator_response = stub.SourceTransformFn(request_iterator=request_generator(request)) + # capture the output from the ReadFn generator and assert. + for r in generator_response: + responses.append(r) + except grpc.RpcError as e: + logging.error(e) + grpc_exception = e + self.assertEqual(grpc.StatusCode.INTERNAL, e.code()) + self.assertTrue("Exception thrown from transform" in e.__str__()) + finally: + raise_error_from_st = False + # 1 handshake + self.assertEqual(1, len(responses)) + self.assertIsNotNone(grpc_exception) + + def test_is_ready(self) -> None: + with grpc.insecure_channel("unix:///tmp/async_st.sock") as channel: + stub = transform_pb2_grpc.SourceTransformStub(channel) + + request = _empty_pb2.Empty() + response = None + try: + response = stub.IsReady(request=request) + except grpc.RpcError as e: + logging.error(e) + + self.assertTrue(response.ready) + + def test_invalid_input(self): + with self.assertRaises(TypeError): + SourceTransformAsyncServer() + + def __stub(self): + return transform_pb2_grpc.SourceTransformStub(_channel) + + def test_max_threads(self): + # max cap at 16 + server = SourceTransformAsyncServer( + source_transform_instance=async_transform_handler, max_threads=32 + ) + self.assertEqual(server.max_threads, 16) + + # use argument provided + server = SourceTransformAsyncServer( + source_transform_instance=async_transform_handler, max_threads=5 + ) + self.assertEqual(server.max_threads, 5) + + # defaults to 4 + server = SourceTransformAsyncServer(source_transform_instance=async_transform_handler) + self.assertEqual(server.max_threads, 4) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() From 9b04382568aa420caaf97f165721111b4944a946 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 17 Jun 2025 15:36:37 -0700 Subject: [PATCH 2/8] feat: add async source transformer Signed-off-by: Sidhant Kohli --- .../servicer/_async_servicer.py | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 pynumaflow/sourcetransformer/servicer/_async_servicer.py diff --git a/pynumaflow/sourcetransformer/servicer/_async_servicer.py b/pynumaflow/sourcetransformer/servicer/_async_servicer.py new file mode 100644 index 00000000..3dfd8383 --- /dev/null +++ b/pynumaflow/sourcetransformer/servicer/_async_servicer.py @@ -0,0 +1,155 @@ +import asyncio +from collections.abc import AsyncIterable + +from google.protobuf import empty_pb2 as _empty_pb2 +from google.protobuf import timestamp_pb2 as _timestamp_pb2 + +from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING +from pynumaflow.proto.sourcetransformer import transform_pb2, transform_pb2_grpc +from pynumaflow.shared.asynciter import NonBlockingIterator +from pynumaflow.shared.server import handle_async_error +from pynumaflow.sourcetransformer import Datum +from pynumaflow.sourcetransformer._dtypes import SourceTransformAsyncCallable +from pynumaflow.types import NumaflowServicerContext + + +def _create_read_handshake_response() -> transform_pb2.SourceTransformResponse: + """ + Create a handshake response for the SourceTransform function. + + Returns: + SourceTransformResponse: A SourceTransformResponse object indicating a successful handshake. + """ + return transform_pb2.SourceTransformResponse( + handshake=transform_pb2.Handshake(sot=True), + ) + + +class SourceTransformAsyncServicer(transform_pb2_grpc.SourceTransformServicer): + """ + This class is used to create a new grpc Batch Map Servicer instance. + It implements the MapServicer interface from the proto + transform_pb2_grpc.py file. + Provides the functionality for the required rpc methods. + """ + + def __init__( + self, + handler: SourceTransformAsyncCallable, + ): + self.background_tasks = set() + self.__transform_handler: SourceTransformAsyncCallable = handler + + async def SourceTransformFn( + self, + request_iterator: AsyncIterable[transform_pb2.SourceTransformRequest], + context: NumaflowServicerContext, + ) -> AsyncIterable[transform_pb2.SourceTransformResponse]: + """ + Applies a batch map function to a MapRequest stream in a batching mode. + The pascal case function name comes from the proto transform_pb2_grpc.py file. + """ + try: + # The first message to be received should be a valid handshake + req = await request_iterator.__anext__() + # check if it is a valid handshake req + if not (req.handshake and req.handshake.sot): + raise Exception("SourceTransformFn: expected handshake message") + yield transform_pb2.SourceTransformResponse( + handshake=transform_pb2.Handshake(sot=True), + ) + + # result queue to stream messages from the user code back to the client + global_result_queue = NonBlockingIterator() + + # reader task to process the input task and invoke the required tasks + producer = asyncio.create_task( + self._process_inputs(request_iterator, global_result_queue) + ) + + # keep reading on result queue and send messages back + consumer = global_result_queue.read_iterator() + async for msg in consumer: + # If the message is an exception, we raise the exception + if isinstance(msg, BaseException): + await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING) + return + # Send window response back to the client + else: + yield msg + # wait for the producer task to complete + await producer + except BaseException as e: + _LOGGER.critical("SourceTransformFnError, re-raising the error", exc_info=True) + await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING) + return + + async def _process_inputs( + self, + request_iterator: AsyncIterable[transform_pb2.SourceTransformRequest], + result_queue: NonBlockingIterator, + ): + """ + Utility function for processing incoming SourceTransformRequest + """ + try: + # for each incoming request, create a background task to execute the + # UDF code + async for req in request_iterator: + msg_task = asyncio.create_task(self._invoke_transform(req, result_queue)) + # save a reference to a set to store active tasks + self.background_tasks.add(msg_task) + msg_task.add_done_callback(self.background_tasks.discard) + + # wait for all tasks to complete + for task in self.background_tasks: + await task + + # send an EOF to result queue to indicate that all tasks have completed + await result_queue.put(STREAM_EOF) + + except BaseException: + _LOGGER.critical("SourceTransformFnError Error, re-raising the error", exc_info=True) + + async def _invoke_transform( + self, request: transform_pb2.SourceTransformRequest, result_queue: NonBlockingIterator + ): + """ + Invokes the user defined function. + """ + try: + datum = Datum( + keys=list(request.request.keys), + value=request.request.value, + event_time=request.request.event_time.ToDatetime(), + watermark=request.request.watermark.ToDatetime(), + headers=dict(request.request.headers), + ) + msgs = await self.__transform_handler(list(request.request.keys), datum) + results = [] + for msg in msgs: + event_time_timestamp = _timestamp_pb2.Timestamp() + event_time_timestamp.FromDatetime(dt=msg.event_time) + results.append( + transform_pb2.SourceTransformResponse.Result( + keys=list(msg.keys), + value=msg.value, + tags=msg.tags, + event_time=event_time_timestamp, + ) + ) + await result_queue.put( + transform_pb2.SourceTransformResponse(results=results, id=request.request.id) + ) + except BaseException as err: + _LOGGER.critical("SourceTransformFnError handler error", exc_info=True) + await result_queue.put(err) + + async def IsReady( + self, request: _empty_pb2.Empty, context: NumaflowServicerContext + ) -> transform_pb2.ReadyResponse: + """ + IsReady is the heartbeat endpoint for gRPC. + The pascal case function name comes from the proto transform_pb2_grpc.py file. + """ + return transform_pb2.ReadyResponse(ready=True) From 00386a7a989d062e7cf646b86c201ca5757fb76d Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 18 Jun 2025 13:19:10 -0700 Subject: [PATCH 3/8] add example Signed-off-by: Sidhant Kohli --- .../async_event_time_filter/Dockerfile | 55 +++++++++++++++++++ .../async_event_time_filter/Makefile | 22 ++++++++ .../async_event_time_filter/entry.sh | 4 ++ .../async_event_time_filter/example.py | 48 ++++++++++++++++ .../async_event_time_filter/pyproject.toml | 15 +++++ .../servicer/_async_servicer.py | 5 +- 6 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 examples/sourcetransform/async_event_time_filter/Dockerfile create mode 100644 examples/sourcetransform/async_event_time_filter/Makefile create mode 100644 examples/sourcetransform/async_event_time_filter/entry.sh create mode 100644 examples/sourcetransform/async_event_time_filter/example.py create mode 100644 examples/sourcetransform/async_event_time_filter/pyproject.toml diff --git a/examples/sourcetransform/async_event_time_filter/Dockerfile b/examples/sourcetransform/async_event_time_filter/Dockerfile new file mode 100644 index 00000000..78f24d83 --- /dev/null +++ b/examples/sourcetransform/async_event_time_filter/Dockerfile @@ -0,0 +1,55 @@ +#################################################################################################### +# builder: install needed dependencies +#################################################################################################### + +FROM python:3.10-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_VERSION=1.2.2 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" + +ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sourcetransform/async_event_time_filter" +ENV VENV_PATH="$EXAMPLE_PATH/.venv" +ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + \ + # install dumb-init + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init \ + && curl -sSL https://install.python-poetry.org | python3 - + +#################################################################################################### +# udf: used for running the udf vertices +#################################################################################################### +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY ./ ./ + +WORKDIR $EXAMPLE_PATH +RUN poetry lock +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ + +RUN chmod +x entry.sh + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] + +EXPOSE 5000 diff --git a/examples/sourcetransform/async_event_time_filter/Makefile b/examples/sourcetransform/async_event_time_filter/Makefile new file mode 100644 index 00000000..5ad2dd38 --- /dev/null +++ b/examples/sourcetransform/async_event_time_filter/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-python/async-mapt-event-time-filter:${TAG} +DOCKER_FILE_PATH = examples/sourcetransform/async_event_time_filter/Dockerfile + +.PHONY: update +update: + poetry update -vv + +.PHONY: image-push +image-push: update + cd ../../../ && docker buildx build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} \ + --platform linux/amd64,linux/arm64 . --push + +.PHONY: image +image: update + cd ../../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}:${TAG}; fi diff --git a/examples/sourcetransform/async_event_time_filter/entry.sh b/examples/sourcetransform/async_event_time_filter/entry.sh new file mode 100644 index 00000000..073b05e3 --- /dev/null +++ b/examples/sourcetransform/async_event_time_filter/entry.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -eux + +python example.py diff --git a/examples/sourcetransform/async_event_time_filter/example.py b/examples/sourcetransform/async_event_time_filter/example.py new file mode 100644 index 00000000..5fc124b8 --- /dev/null +++ b/examples/sourcetransform/async_event_time_filter/example.py @@ -0,0 +1,48 @@ +import datetime +import logging + +from pynumaflow.sourcetransformer import Messages, Message, Datum +from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer + +""" +This is a simple User Defined Function example which receives a message, applies the following +data transformation, and returns the message. +If the message event time is before year 2022, drop the message with event time unchanged. +If it's within year 2022, update the tag to "within_year_2022" and +update the message event time to Jan 1st 2022. +Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the +message event time to Jan 1st 2023. +""" + +january_first_2022 = datetime.datetime.fromtimestamp(1640995200) +january_first_2023 = datetime.datetime.fromtimestamp(1672531200) + + +async def my_handler(keys: list[str], datum: Datum) -> Messages: + val = datum.value + event_time = datum.event_time + messages = Messages() + + if event_time < january_first_2022: + logging.info("Got event time:%s, it is before 2022, so dropping", event_time) + messages.append(Message.to_drop(event_time)) + elif event_time < january_first_2023: + logging.info( + "Got event time:%s, it is within year 2022, so forwarding to within_year_2022", + event_time, + ) + messages.append( + Message(value=val, event_time=january_first_2022, tags=["within_year_2022"]) + ) + else: + logging.info( + "Got event time:%s, it is after year 2022, so forwarding to after_year_2022", event_time + ) + messages.append(Message(value=val, event_time=january_first_2023, tags=["after_year_2022"])) + + return messages + + +if __name__ == "__main__": + grpc_server = SourceTransformAsyncServer(my_handler) + grpc_server.start() diff --git a/examples/sourcetransform/async_event_time_filter/pyproject.toml b/examples/sourcetransform/async_event_time_filter/pyproject.toml new file mode 100644 index 00000000..7c5bf2b5 --- /dev/null +++ b/examples/sourcetransform/async_event_time_filter/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "async-mapt-event-time-filter" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] +readme = "README.md" +packages = [{include = "mapt_event_time_filter"}] + +[tool.poetry.dependencies] +python = ">=3.9, <3.12" +pynumaflow = { path = "../../../"} + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/pynumaflow/sourcetransformer/servicer/_async_servicer.py b/pynumaflow/sourcetransformer/servicer/_async_servicer.py index 3dfd8383..1c596cb1 100644 --- a/pynumaflow/sourcetransformer/servicer/_async_servicer.py +++ b/pynumaflow/sourcetransformer/servicer/_async_servicer.py @@ -101,9 +101,8 @@ async def _process_inputs( self.background_tasks.add(msg_task) msg_task.add_done_callback(self.background_tasks.discard) - # wait for all tasks to complete - for task in self.background_tasks: - await task + # Wait for all tasks to complete concurrently + await asyncio.gather(*self.background_tasks) # send an EOF to result queue to indicate that all tasks have completed await result_queue.put(STREAM_EOF) From 38e85444ff7ff29d29f634149fc30c5c090194a0 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 18 Jun 2025 14:39:12 -0700 Subject: [PATCH 4/8] add example Signed-off-by: Sidhant Kohli --- pynumaflow/sourcetransformer/servicer/_async_servicer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pynumaflow/sourcetransformer/servicer/_async_servicer.py b/pynumaflow/sourcetransformer/servicer/_async_servicer.py index 1c596cb1..e8f53904 100644 --- a/pynumaflow/sourcetransformer/servicer/_async_servicer.py +++ b/pynumaflow/sourcetransformer/servicer/_async_servicer.py @@ -27,8 +27,8 @@ def _create_read_handshake_response() -> transform_pb2.SourceTransformResponse: class SourceTransformAsyncServicer(transform_pb2_grpc.SourceTransformServicer): """ - This class is used to create a new grpc Batch Map Servicer instance. - It implements the MapServicer interface from the proto + This class is used to create a new grpc SourceTransform Async Servicer instance. + It implements the SourceTransformServicer interface from the proto transform_pb2_grpc.py file. Provides the functionality for the required rpc methods. """ @@ -46,7 +46,7 @@ async def SourceTransformFn( context: NumaflowServicerContext, ) -> AsyncIterable[transform_pb2.SourceTransformResponse]: """ - Applies a batch map function to a MapRequest stream in a batching mode. + Applies a transform function to a SourceTransformRequest stream The pascal case function name comes from the proto transform_pb2_grpc.py file. """ try: From f2c6d334f9168daa2709ea90d85222daecada6db Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 18 Jun 2025 15:59:32 -0700 Subject: [PATCH 5/8] clean Signed-off-by: Sidhant Kohli --- .../sourcetransformer/servicer/_async_servicer.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pynumaflow/sourcetransformer/servicer/_async_servicer.py b/pynumaflow/sourcetransformer/servicer/_async_servicer.py index e8f53904..b2e70799 100644 --- a/pynumaflow/sourcetransformer/servicer/_async_servicer.py +++ b/pynumaflow/sourcetransformer/servicer/_async_servicer.py @@ -13,18 +13,6 @@ from pynumaflow.types import NumaflowServicerContext -def _create_read_handshake_response() -> transform_pb2.SourceTransformResponse: - """ - Create a handshake response for the SourceTransform function. - - Returns: - SourceTransformResponse: A SourceTransformResponse object indicating a successful handshake. - """ - return transform_pb2.SourceTransformResponse( - handshake=transform_pb2.Handshake(sot=True), - ) - - class SourceTransformAsyncServicer(transform_pb2_grpc.SourceTransformServicer): """ This class is used to create a new grpc SourceTransform Async Servicer instance. From 77953faa949fe8678c28cf14bbee2b9178f46426 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 18 Jun 2025 16:02:56 -0700 Subject: [PATCH 6/8] clean Signed-off-by: Sidhant Kohli --- examples/sourcetransform/async_event_time_filter/example.py | 2 +- pynumaflow/sourcetransformer/__init__.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/sourcetransform/async_event_time_filter/example.py b/examples/sourcetransform/async_event_time_filter/example.py index 5fc124b8..659b7e24 100644 --- a/examples/sourcetransform/async_event_time_filter/example.py +++ b/examples/sourcetransform/async_event_time_filter/example.py @@ -2,7 +2,7 @@ import logging from pynumaflow.sourcetransformer import Messages, Message, Datum -from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer +from pynumaflow.sourcetransformer import SourceTransformAsyncServer """ This is a simple User Defined Function example which receives a message, applies the following diff --git a/pynumaflow/sourcetransformer/__init__.py b/pynumaflow/sourcetransformer/__init__.py index 69f8018c..8eee3786 100644 --- a/pynumaflow/sourcetransformer/__init__.py +++ b/pynumaflow/sourcetransformer/__init__.py @@ -7,6 +7,7 @@ ) from pynumaflow.sourcetransformer.multiproc_server import SourceTransformMultiProcServer from pynumaflow.sourcetransformer.server import SourceTransformServer +from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer __all__ = [ "Message", @@ -16,4 +17,5 @@ "SourceTransformServer", "SourceTransformer", "SourceTransformMultiProcServer", + "SourceTransformAsyncServer", ] From 6473d4f2f8f6f972604670a0b8baa0103ec0dd59 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 24 Jun 2025 12:02:04 -0700 Subject: [PATCH 7/8] class test Signed-off-by: Sidhant Kohli --- tests/sourcetransform/test_async.py | 41 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/tests/sourcetransform/test_async.py b/tests/sourcetransform/test_async.py index fd571904..73a6c63d 100644 --- a/tests/sourcetransform/test_async.py +++ b/tests/sourcetransform/test_async.py @@ -12,7 +12,7 @@ from pynumaflow import setup_logging from pynumaflow._constants import MAX_MESSAGE_SIZE from pynumaflow.proto.sourcetransformer import transform_pb2_grpc -from pynumaflow.sourcetransformer import Datum, Messages, Message +from pynumaflow.sourcetransformer import Datum, Messages, Message, SourceTransformer from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer from tests.sourcetransform.utils import get_test_datums from tests.testing_utils import ( @@ -26,18 +26,19 @@ raise_error_from_st = False -async def async_transform_handler(keys: list[str], datum: Datum) -> Messages: - if raise_error_from_st: - raise ValueError("Exception thrown from transform") - val = datum.value - msg = "payload:{} event_time:{} ".format( - val.decode("utf-8"), - datum.event_time, - ) - val = bytes(msg, encoding="utf-8") - messages = Messages() - messages.append(Message(val, mock_new_event_time(), keys=keys)) - return messages +class TestAsyncSourceTrn(SourceTransformer): + async def handler(self, keys: list[str], datum: Datum) -> Messages: + if raise_error_from_st: + raise ValueError("Exception thrown from transform") + val = datum.value + msg = "payload:{} event_time:{} ".format( + val.decode("utf-8"), + datum.event_time, + ) + val = bytes(msg, encoding="utf-8") + messages = Messages() + messages.append(Message(val, mock_new_event_time(), keys=keys)) + return messages def request_generator(req): @@ -55,7 +56,8 @@ def startup_callable(loop): def new_async_st(): - server = SourceTransformAsyncServer(source_transform_instance=async_transform_handler) + handle = TestAsyncSourceTrn() + server = SourceTransformAsyncServer(source_transform_instance=handle) udfs = server.servicer return udfs @@ -251,20 +253,17 @@ def __stub(self): return transform_pb2_grpc.SourceTransformStub(_channel) def test_max_threads(self): + handle = TestAsyncSourceTrn() # max cap at 16 - server = SourceTransformAsyncServer( - source_transform_instance=async_transform_handler, max_threads=32 - ) + server = SourceTransformAsyncServer(source_transform_instance=handle, max_threads=32) self.assertEqual(server.max_threads, 16) # use argument provided - server = SourceTransformAsyncServer( - source_transform_instance=async_transform_handler, max_threads=5 - ) + server = SourceTransformAsyncServer(source_transform_instance=handle, max_threads=5) self.assertEqual(server.max_threads, 5) # defaults to 4 - server = SourceTransformAsyncServer(source_transform_instance=async_transform_handler) + server = SourceTransformAsyncServer(source_transform_instance=handle) self.assertEqual(server.max_threads, 4) From 0b320e1769c09697c62114e829405b6cfe38e81e Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 24 Jun 2025 14:54:17 -0700 Subject: [PATCH 8/8] class test Signed-off-by: Sidhant Kohli --- tests/sourcetransform/test_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/sourcetransform/test_async.py b/tests/sourcetransform/test_async.py index 73a6c63d..05f7f29d 100644 --- a/tests/sourcetransform/test_async.py +++ b/tests/sourcetransform/test_async.py @@ -26,7 +26,7 @@ raise_error_from_st = False -class TestAsyncSourceTrn(SourceTransformer): +class SimpleAsyncSourceTrn(SourceTransformer): async def handler(self, keys: list[str], datum: Datum) -> Messages: if raise_error_from_st: raise ValueError("Exception thrown from transform") @@ -56,7 +56,7 @@ def startup_callable(loop): def new_async_st(): - handle = TestAsyncSourceTrn() + handle = SimpleAsyncSourceTrn() server = SourceTransformAsyncServer(source_transform_instance=handle) udfs = server.servicer return udfs @@ -253,7 +253,7 @@ def __stub(self): return transform_pb2_grpc.SourceTransformStub(_channel) def test_max_threads(self): - handle = TestAsyncSourceTrn() + handle = SimpleAsyncSourceTrn() # max cap at 16 server = SourceTransformAsyncServer(source_transform_instance=handle, max_threads=32) self.assertEqual(server.max_threads, 16)