Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
022f846
tmp: disable existing tracers
deanq Oct 28, 2024
150f534
tmp: auto-instrumentations for OTEL
deanq Oct 28, 2024
ecda2d4
tmp: our collector can't support gRPC behind ALB
deanq Oct 29, 2024
f03a03e
tmp: trace the entire JobScaler functions
deanq Oct 31, 2024
1b4d9e3
tmp: too much junk traces from the loop
deanq Oct 31, 2024
a2759af
tmp: still too much traces
deanq Oct 31, 2024
b3fae8f
tmp: correction
deanq Oct 31, 2024
6f0efac
tmp: only trace http requests from http_client.py
deanq Nov 3, 2024
4a96d0f
tmp: trace to connect job queues, progress, handling, and reporting
deanq Nov 3, 2024
83dc31b
tmp: remove unused instrumentations
deanq Nov 3, 2024
25762a6
tmp: handle_job is child to queue_job
deanq Nov 3, 2024
97dbb78
tmp: disable http_client tracing temporarily
deanq Nov 3, 2024
f0f6997
tmp: job.get("trace").get_span_context()
deanq Nov 3, 2024
fec5a53
tmp: correction to tracer span context
deanq Nov 3, 2024
b3bced1
tmp: custom runpod namespace for process tags
deanq Nov 5, 2024
d19ac10
tmp: cleanup by black format
deanq Nov 5, 2024
c21cfe5
tmp: otel tracing sls-core hooks
deanq Nov 5, 2024
9bac622
tmp: using decorators where appropriate
deanq Nov 5, 2024
4939bb1
tmp: cleanup otel resource definition
deanq Nov 5, 2024
d14decd
tmp: trace pings
deanq Nov 5, 2024
b82c6a8
tmp: refactored worker_state to trace request_id list in ping
deanq Nov 5, 2024
6d4d0e2
tmp: add_event for each request_id in the send_ping
deanq Nov 6, 2024
764dd6a
tmp: removed get_job_list
deanq Nov 6, 2024
b9687c9
tmp: context propagation
deanq Nov 6, 2024
96fc079
Merge branch 'main' into otel
deanq Nov 21, 2024
0e666b5
tmp: missed this merge
deanq Nov 21, 2024
e53d635
tmp: missed this merge
deanq Nov 21, 2024
c774bfd
tmp: noop for disabled otel
deanq Nov 21, 2024
f0cb739
Merge branch 'main' into otel
deanq Dec 7, 2024
7c9ed57
tmp: proper spankind
deanq Dec 8, 2024
83b7e08
Merge branch 'main' into otel
deanq Dec 10, 2024
b030cec
tmp: force sampling from this parent span
deanq Dec 10, 2024
a800261
tmp: revert
deanq Dec 10, 2024
b8377b8
tmp: capture job_output
deanq Dec 10, 2024
f10693f
tmp: check for "error" in a dict
deanq Dec 10, 2024
1aed002
tmp: capture and report error
deanq Dec 10, 2024
619d2c1
tmp: record Stream output
deanq Dec 10, 2024
bd6d7de
tmp: avoid confusion with rp_job.handle_job
deanq Dec 10, 2024
85e4b4d
tmp: perform_job
deanq Dec 10, 2024
db3b660
tmp: capture proper error
deanq Dec 10, 2024
962df33
tmp: need to capture "error" in output better
deanq Dec 10, 2024
8da936a
tmp: record_exception fix
deanq Dec 10, 2024
940b355
tmp: explicit
deanq Dec 10, 2024
7e024b2
tmp: omg
deanq Dec 10, 2024
ccd01fd
tmp: fix
deanq Dec 10, 2024
32eebaa
tmp: shift error as job_result
deanq Dec 10, 2024
1e9777f
tmp: seriously?
deanq Dec 10, 2024
e432baf
tmp: 1
deanq Dec 10, 2024
3dbf5f1
tmp: trace the transmit job_data
deanq Dec 10, 2024
86a357d
tmp: stringify the error object
deanq Dec 10, 2024
67e8648
Merge branch 'main' into otel
deanq Dec 11, 2024
bfd6a0b
tmp: forced tracing by RUNPOD_LOG_LEVEL=TRACE
deanq Dec 13, 2024
1ef0f35
tmp: otel.start() to activate
deanq Dec 17, 2024
ddff0a2
tmp: print sampling strategy
deanq Dec 17, 2024
1489939
tmp: pytest-env
deanq Dec 18, 2024
749a2c7
tmp: fix `Attempting to instrument while already instrumented`
deanq Dec 19, 2024
ed057a3
tmp: otel scoped to serverless only
deanq Dec 19, 2024
507c2dc
Merge branch 'main' into otel
deanq Apr 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ test = [
"faker",
"pytest-asyncio",
"pytest-cov",
"pytest-env",
"pytest-timeout",
"pytest-watch",
"pytest",
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
addopts = --durations=10 --cov-config=.coveragerc --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=90 -W error -p no:cacheprovider -p no:unraisableexception
python_files = tests.py test_*.py *_test.py
norecursedirs = venv *.egg-info .git build
env =
D:ENV=test
D:RUNPOD_LOG_LEVEL=ERROR
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ tomlkit >= 0.12.2
tqdm-loggable >= 0.1.4
urllib3 >= 1.26.6
watchdog >= 3.0.0

setuptools==65.6.3
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation-aiohttp-client
opentelemetry-instrumentation-requests
23 changes: 14 additions & 9 deletions runpod/http_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
"""
HTTP Client abstractions
HTTP Client abstractions with OpenTelemetry tracing support.
"""

import os

import requests
from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientResponseError
from opentelemetry import trace
from opentelemetry.instrumentation.aiohttp_client import create_trace_config
from opentelemetry.instrumentation.requests import RequestsInstrumentor

from .cli.groups.config.functions import get_credentials
from .user_agent import USER_AGENT

tracer = trace.get_tracer(__name__)


class TooManyRequests(ClientResponseError):
pass
Expand All @@ -32,22 +36,23 @@ def get_auth_header():
}


def AsyncClientSession(*args, **kwargs): # pylint: disable=invalid-name
def AsyncClientSession(*args, **kwargs):
"""
Deprecation from aiohttp.ClientSession forbids inheritance.
This is now a factory method
Factory method for an async client session with OpenTelemetry tracing.
"""
return ClientSession(
connector=TCPConnector(limit=0),
headers=get_auth_header(),
timeout=ClientTimeout(600, ceil_threshold=400),
trace_configs=[create_trace_config()],
*args,
**kwargs,
)


class SyncClientSession(requests.Session):
"""
Inherits requests.Session to override `request()` method for tracing
"""
pass
def __init__(self):
super().__init__()
self.headers.update(get_auth_header())

RequestsInstrumentor().instrument()
17 changes: 15 additions & 2 deletions runpod/serverless/modules/rp_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from aiohttp import ClientError
from aiohttp_retry import FibonacciRetry, RetryClient
from opentelemetry import trace

from runpod.http_client import ClientSession
from runpod.serverless.modules.rp_logger import RunPodLogger
Expand All @@ -24,12 +25,17 @@
JOB_STREAM_URL = JOB_STREAM_URL_TEMPLATE.replace("$RUNPOD_POD_ID", WORKER_ID)

log = RunPodLogger()
tracer = trace.get_tracer(__name__)


@tracer.start_as_current_span("transmit", kind=trace.SpanKind.CLIENT)
async def _transmit(client_session: ClientSession, url, job_data):
"""
Wrapper for transmitting results via POST.
"""
span = trace.get_current_span()
span.set_attribute("job_data", job_data)

retry_options = FibonacciRetry(attempts=3)
retry_client = RetryClient(
client_session=client_session, retry_options=retry_options
Expand All @@ -48,15 +54,18 @@ async def _transmit(client_session: ClientSession, url, job_data):
await client_response.text()


@tracer.start_as_current_span("handle_result", kind=trace.SpanKind.CLIENT)
async def _handle_result(
session: ClientSession, job_data, job, url_template, log_message, is_stream=False
):
"""
A helper function to handle the result, either for sending or streaming.
"""
try:
session.headers["X-Request-ID"] = job["id"]
span = trace.get_current_span()
span.set_attribute("request_id", job.get("id"))
span.set_attribute("is_stream", is_stream)

try:
serialized_job_data = json.dumps(job_data, ensure_ascii=False)

is_stream = "true" if is_stream else "false"
Expand All @@ -66,9 +75,11 @@ async def _handle_result(
log.debug(f"{log_message}", job["id"])

except ClientError as err:
span.record_exception(err)
log.error(f"Failed to return job results. | {err}", job["id"])

except (TypeError, RuntimeError) as err:
span.record_exception(err)
log.error(f"Error while returning job result. | {err}", job["id"])

finally:
Expand All @@ -80,6 +91,7 @@ async def _handle_result(
log.info("Finished.", job["id"])


@tracer.start_as_current_span("send_result")
async def send_result(session, job_data, job, is_stream=False):
"""
Return the job results.
Expand All @@ -89,6 +101,7 @@ async def send_result(session, job_data, job, is_stream=False):
)


@tracer.start_as_current_span("stream_result")
async def stream_result(session, job_data, job):
"""
Return the stream job results.
Expand Down
28 changes: 26 additions & 2 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import traceback
from opentelemetry import trace
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union, List

import aiohttp
Expand All @@ -24,6 +25,7 @@

log = RunPodLogger()
job_progress = JobsProgress()
tracer = trace.get_tracer(__name__)


def _job_get_url(batch_size: int = 1):
Expand Down Expand Up @@ -117,14 +119,26 @@ async def get_job(
return jobs


async def handle_job(session: ClientSession, config: Dict[str, Any], job) -> dict:
@tracer.start_as_current_span("handle_error")
def _handle_error(err_output: any, job: dict) -> bool:
span = trace.get_current_span()

span.set_status(trace.Status(trace.StatusCode.ERROR, str(err_output)))
log.debug(f"Handled error: {err_output}", job["id"])


@tracer.start_as_current_span("handle_job")
async def handle_job(session: ClientSession, config: Dict[str, Any], job: dict) -> dict:
span = trace.get_current_span()
span.set_attribute("request_id", job.get("id"))

if is_generator(config["handler"]):
is_stream = True
generator_output = run_job_generator(config["handler"], job)
log.debug("Handler is a generator, streaming results.", job["id"])

job_result = {"output": []}
async for stream_output in generator_output:
# temp
log.debug(f"Stream output: {stream_output}", job["id"])

if type(stream_output.get("output")) == dict:
Expand Down Expand Up @@ -164,6 +178,7 @@ async def handle_job(session: ClientSession, config: Dict[str, Any], job) -> dic
await send_result(session, job_result, job, is_stream=is_stream)


@tracer.start_as_current_span("run_job")
async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
"""
Run the job using the handler.
Expand All @@ -175,6 +190,9 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
Returns:
Dict[str, Any]: The result of running the job.
"""
span = trace.get_current_span()
span.set_attribute("request_id", job.get("id"))

log.info("Started.", job["id"])
run_result = {}

Expand Down Expand Up @@ -210,6 +228,7 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
check_return_size(run_result) # Checks the size of the return body.

except Exception as err:
span.record_exception(err)
error_info = {
"error_type": str(type(err)),
"error_message": str(err),
Expand All @@ -229,13 +248,17 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
return run_result


@tracer.start_as_current_span("run_job_generator")
async def run_job_generator(
handler: Callable, job: Dict[str, Any]
) -> AsyncGenerator[Dict[str, Union[str, Any]], None]:
"""
Run generator job used to stream output.
Yields output partials from the generator.
"""
span = trace.get_current_span()
span.set_attribute("request_id", job.get("id"))

is_async_gen = inspect.isasyncgenfunction(handler)
log.debug(
"Using Async Generator" if is_async_gen else "Using Standard Generator",
Expand All @@ -255,6 +278,7 @@ async def run_job_generator(
yield {"output": output_partial}

except Exception as err:
span.record_exception(err)
log.error(err, job["id"])
yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"}
finally:
Expand Down
15 changes: 12 additions & 3 deletions runpod/serverless/modules/rp_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time

import requests
from opentelemetry import trace
from urllib3.util.retry import Retry

from runpod.http_client import SyncClientSession
Expand All @@ -16,7 +17,8 @@
from runpod.version import __version__ as runpod_version

log = RunPodLogger()
jobs = JobsProgress() # Contains the list of jobs that are currently running.
job_progress = JobsProgress() # Contains the list of jobs that are currently running.
tracer = trace.get_tracer(__name__)


class Heartbeat:
Expand Down Expand Up @@ -83,12 +85,18 @@ def ping_loop(self, test=False):
if test:
return

@tracer.start_as_current_span("send_ping", kind=trace.SpanKind.CLIENT)
def _send_ping(self):
"""
Sends a heartbeat to the Runpod server.
"""
job_ids = jobs.get_job_list()
ping_params = {"job_id": job_ids, "runpod_version": runpod_version}
span = trace.get_current_span()
job_ids = []
for job in job_progress:
span.add_event("ping", {"request_id": job.id})
job_ids.append(job.id)

ping_params = {"job_id": ",".join(job_ids), "runpod_version": runpod_version}

try:
result = self._session.get(
Expand All @@ -100,4 +108,5 @@ def _send_ping(self):
)

except requests.RequestException as err:
span.record_exception(err)
log.error(f"Ping Request Error: {err}, attempting to restart ping.")
Loading
Loading