Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions poetry.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@

SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"

# UDF execution error prefixes
ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"
ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"
ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)"
ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"
ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"
ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"
ERR_REDUCE_EXCEPTION = "UDF_EXECUTION_ERROR(reduce)"
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION = "UDF_EXECUTION_ERROR(sideinput)"

# Socket configs
MAP_SOCK_PATH = "/var/run/numaflow/map.sock"
MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock"
Expand Down
10 changes: 3 additions & 7 deletions pynumaflow/batchmapper/servicer/async_servicer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio
from collections.abc import AsyncIterable

import grpc
from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow.batchmapper import Datum
from pynumaflow.batchmapper._dtypes import BatchMapCallable, BatchMapError
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.asynciter import NonBlockingIterator
from pynumaflow.shared.server import exit_on_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER, STREAM_EOF
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_BATCH_MAP_EXCEPTION


class AsyncBatchMapServicer(map_pb2_grpc.MapServicer):
Expand Down Expand Up @@ -99,10 +98,7 @@ async def MapFn(

except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(err)), return_exceptions=True
)
exit_on_error(context, repr(err))
await handle_async_error(context, err, ERR_BATCH_MAP_EXCEPTION)
return

async def IsReady(
Expand Down
15 changes: 7 additions & 8 deletions pynumaflow/mapper/_servicer/_async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from google.protobuf import empty_pb2 as _empty_pb2
from pynumaflow.shared.asynciter import NonBlockingIterator

from pynumaflow._constants import _LOGGER, STREAM_EOF
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_MAP_EXCEPTION
from pynumaflow.mapper._dtypes import MapAsyncCallable, Datum, MapError
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.server import exit_on_error, handle_async_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -56,7 +56,7 @@ async def MapFn(
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, BaseException):
await handle_async_error(context, msg)
await handle_async_error(context, msg, ERR_MAP_EXCEPTION)
return
# Send window response back to the client
else:
Expand All @@ -65,7 +65,7 @@ async def MapFn(
await producer
except BaseException as e:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(e))
await handle_async_error(context, e, ERR_MAP_EXCEPTION)
return

async def _process_inputs(
Expand All @@ -92,9 +92,8 @@ async def _process_inputs(
# send an EOF to result queue to indicate that all tasks have completed
await result_queue.put(STREAM_EOF)

except BaseException as e:
await result_queue.put(e)
return
except BaseException:
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)

async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIterator):
"""
Expand All @@ -116,7 +115,7 @@ async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIt
)
await result_queue.put(map_pb2.MapResponse(results=datums, id=req.id))
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
_LOGGER.critical("MapFn handler error", exc_info=True)
await result_queue.put(err)

async def IsReady(
Expand Down
11 changes: 6 additions & 5 deletions pynumaflow/mapper/_servicer/_sync_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from google.protobuf import empty_pb2 as _empty_pb2
from pynumaflow.shared.server import exit_on_error

from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER
from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_MAP_EXCEPTION
from pynumaflow.mapper._dtypes import MapSyncCallable, Datum, MapError
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.synciter import SyncIterator
Expand Down Expand Up @@ -57,7 +57,9 @@
# if error handler accordingly
if isinstance(res, BaseException):
# Terminate the current server process due to exception
exit_on_error(context, repr(res), parent=self.multiproc)
exit_on_error(
context, f"{ERR_MAP_EXCEPTION}: {repr(res)}", parent=self.multiproc
)
return
# return the result
yield res
Expand All @@ -69,7 +71,7 @@
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
# Terminate the current server process due to exception
exit_on_error(context, repr(err), parent=self.multiproc)
exit_on_error(context, f"{ERR_MAP_EXCEPTION}: {repr(err)}", parent=self.multiproc)
return

def _process_requests(
Expand All @@ -87,9 +89,8 @@
self.executor.shutdown(wait=True)
# Indicate to the result queue that no more messages left to process
result_queue.put(STREAM_EOF)
except BaseException as e:
except BaseException:

Check warning on line 92 in pynumaflow/mapper/_servicer/_sync_servicer.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/mapper/_servicer/_sync_servicer.py#L92

Added line #L92 was not covered by tests
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
result_queue.put(e)

def _invoke_map(
self,
Expand Down
8 changes: 4 additions & 4 deletions pynumaflow/mapstreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from pynumaflow.mapstreamer import Datum
from pynumaflow.mapstreamer._dtypes import MapStreamCallable, MapStreamError
from pynumaflow.proto.mapper import map_pb2_grpc, map_pb2
from pynumaflow.shared.server import exit_on_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER
from pynumaflow._constants import _LOGGER, ERR_MAP_STREAM_EXCEPTION


class AsyncMapStreamServicer(map_pb2_grpc.MapServicer):
Expand Down Expand Up @@ -59,7 +59,7 @@ async def MapFn(
yield map_pb2.MapResponse(status=map_pb2.TransmissionStatus(eot=True), id=req.id)
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(err))
await handle_async_error(context, err, ERR_MAP_STREAM_EXCEPTION)
return

async def __invoke_map_stream(self, keys: list[str], req: Datum):
Expand All @@ -68,7 +68,7 @@ async def __invoke_map_stream(self, keys: list[str], req: Datum):
async for msg in self.__map_stream_handler(keys, req):
yield map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags)
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
_LOGGER.critical("MapFn handler error", exc_info=True)
raise err

async def IsReady(
Expand Down
16 changes: 4 additions & 12 deletions pynumaflow/reducer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
from collections.abc import AsyncIterable
from typing import Union

import grpc
from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow._constants import _LOGGER
from pynumaflow._constants import _LOGGER, ERR_REDUCE_EXCEPTION
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
from pynumaflow.reducer._dtypes import (
Datum,
Expand All @@ -15,7 +13,7 @@
WindowOperation,
)
from pynumaflow.reducer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import exit_on_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -107,10 +105,7 @@ async def ReduceFn(
_LOGGER.critical("Reduce Error", exc_info=True)
# Send a context abort signal for the rpc, this is required for numa container to get
# the correct grpc error
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)

# send EOF to all the tasks once the request iterator is exhausted
# This will signal the tasks to stop reading the data on their
Expand Down Expand Up @@ -141,10 +136,7 @@ async def ReduceFn(
_LOGGER.critical("Reduce Error", exc_info=True)
# Send a context abort signal for the rpc, this is required for numa container to get
# the correct grpc error
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)

async def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
Expand Down
7 changes: 4 additions & 3 deletions pynumaflow/reducestreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow._constants import ERR_REDUCE_EXCEPTION
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
from pynumaflow.reducestreamer._dtypes import (
Datum,
Expand Down Expand Up @@ -94,20 +95,20 @@
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, BaseException):
await handle_async_error(context, msg)
await handle_async_error(context, msg, ERR_REDUCE_EXCEPTION)
return
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except BaseException as e:
await handle_async_error(context, e)
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)

Check warning on line 105 in pynumaflow/reducestreamer/servicer/async_servicer.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducestreamer/servicer/async_servicer.py#L105

Added line #L105 was not covered by tests
return
# Wait for the process_input_stream task to finish for a clean exit
try:
await producer
except BaseException as e:
await handle_async_error(context, e)
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)

Check warning on line 111 in pynumaflow/reducestreamer/servicer/async_servicer.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducestreamer/servicer/async_servicer.py#L111

Added line #L111 was not covered by tests
return

async def IsReady(
Expand Down
46 changes: 38 additions & 8 deletions pynumaflow/shared/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import os
import socket
import traceback

from google.protobuf import any_pb2
from google.rpc import code_pb2, status_pb2, error_details_pb2
from grpc_status import rpc_status
from abc import ABCMeta, abstractmethod
from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -240,6 +244,21 @@ def check_instance(instance, callable_type) -> bool:
return False


def get_grpc_status(err: str):
"""
Create a grpc status object with the error details.
"""
details = any_pb2.Any()
details.Pack(
error_details_pb2.DebugInfo(
detail="\n".join(traceback.format_stack()),
)
)

status = status_pb2.Status(code=code_pb2.INTERNAL, message=err, details=[details])
return rpc_status.to_status(status)


def exit_on_error(
context: NumaflowServicerContext, err: str, parent: bool = False, update_context=True
):
Expand All @@ -255,8 +274,12 @@ def exit_on_error(
the context with the error codes
"""
if update_context:
context.set_code(grpc.StatusCode.UNKNOWN)
# Create a status object with the error details
grpc_status = get_grpc_status(err)

context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(err)
context.set_trailing_metadata(grpc_status.trailing_metadata)

p = psutil.Process(os.getpid())
# If the parent flag is true, we exit from the parent process
Expand All @@ -267,15 +290,19 @@ def exit_on_error(
p.kill()


def update_context_err(context: NumaflowServicerContext, e: BaseException):
def update_context_err(context: NumaflowServicerContext, e: BaseException, err_msg: str):
"""
Update the context with the error and log the exception.
"""
trace = get_exception_traceback_str(e)
_LOGGER.critical(trace)
_LOGGER.critical(e.__str__())
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())

grpc_status = get_grpc_status(err_msg)

context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(err_msg)
context.set_trailing_metadata(grpc_status.trailing_metadata)


def get_exception_traceback_str(exc) -> str:
Expand All @@ -284,12 +311,15 @@ def get_exception_traceback_str(exc) -> str:
return file.getvalue().rstrip()


async def handle_async_error(context: NumaflowServicerContext, exception: BaseException):
async def handle_async_error(
context: NumaflowServicerContext, exception: BaseException, exception_type: str
):
"""
Handle exceptions for async servers by updating the context and exiting.
"""
update_context_err(context, exception)
err_msg = f"{exception_type}: {repr(exception)}"
update_context_err(context, exception, err_msg)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(exception)), return_exceptions=True
context.abort(grpc.StatusCode.INTERNAL, details=err_msg), return_exceptions=True
)
exit_on_error(err=repr(exception), parent=False, context=context, update_context=False)
exit_on_error(err=err_msg, parent=False, context=context, update_context=False)
5 changes: 3 additions & 2 deletions pynumaflow/sideinput/servicer/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pynumaflow._constants import (
_LOGGER,
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION,
)
from pynumaflow.proto.sideinput import sideinput_pb2_grpc, sideinput_pb2
from pynumaflow.shared.server import exit_on_error
Expand All @@ -27,9 +28,9 @@ def RetrieveSideInput(
try:
rspn = self.__retrieve_handler()
except BaseException as err:
err_msg = f"RetrieveSideInputErr: {repr(err)}"
err_msg = f"{ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION}: {repr(err)}"
_LOGGER.critical(err_msg, exc_info=True)
exit_on_error(context, repr(err))
exit_on_error(context, err_msg)
return

return sideinput_pb2.SideInputResponse(value=rspn.value, no_broadcast=rspn.no_broadcast)
Expand Down
7 changes: 3 additions & 4 deletions pynumaflow/sinker/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from google.protobuf import empty_pb2 as _empty_pb2
from pynumaflow.shared.asynciter import NonBlockingIterator

from pynumaflow.shared.server import exit_on_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.sinker._dtypes import Datum, SinkAsyncCallable
from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2
from pynumaflow.sinker.servicer.utils import (
Expand All @@ -13,7 +13,7 @@
build_sink_resp_results,
)
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER, STREAM_EOF
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_SINK_EXCEPTION


class AsyncSinkServicer(sink_pb2_grpc.SinkServicer):
Expand Down Expand Up @@ -85,7 +85,7 @@ async def SinkFn(
# if there is an exception, we will mark all the responses as a failure
err_msg = f"UDSinkError: {repr(err)}"
_LOGGER.critical(err_msg, exc_info=True)
exit_on_error(context, err_msg)
await handle_async_error(context, err, ERR_SINK_EXCEPTION)
return

async def __invoke_sink(
Expand All @@ -98,7 +98,6 @@ async def __invoke_sink(
except BaseException as err:
err_msg = f"UDSinkError: {repr(err)}"
_LOGGER.critical(err_msg, exc_info=True)
exit_on_error(context, err_msg)
raise err

async def IsReady(
Expand Down
Loading