From 3f3adf7e329cd2fc7b1b8dbdded4772c0101d07d Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Mon, 15 May 2023 02:33:14 -0400 Subject: [PATCH 1/2] Initial implementation client/grpc_aio.py (#104) * Initial implementation client/grpc_aio.py * Basically finish grpc aio * Use put_nowait to replace run_coroutine_threadsafe() --------- Co-authored-by: FAWC438 Co-authored-by: Kevin Ling --- poetry.lock | 124 ++------ pyproject.toml | 2 + skywalking/agent/__init__.py | 296 +++++++++++++++++- skywalking/agent/protocol/__init__.py | 30 ++ skywalking/agent/protocol/grpc.py | 2 +- skywalking/agent/protocol/grpc_aio.py | 262 ++++++++++++++++ skywalking/agent/protocol/interceptors_aio.py | 74 +++++ skywalking/bootstrap/loader/sitecustomize.py | 3 +- skywalking/client/__init__.py | 111 +++++++ skywalking/client/grpc_aio.py | 123 ++++++++ skywalking/config.py | 17 +- 11 files changed, 948 insertions(+), 96 deletions(-) create mode 100644 skywalking/agent/protocol/grpc_aio.py create mode 100644 skywalking/agent/protocol/interceptors_aio.py create mode 100644 skywalking/client/grpc_aio.py diff --git a/poetry.lock b/poetry.lock index a86b6c47..533808ad 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. [[package]] name = "aiofiles" @@ -262,7 +262,7 @@ test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"] name = "attrs" version = "22.2.0" description = "Classes Without Boilerplate" -category = "main" +category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -663,7 +663,7 @@ six = "*" name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." -category = "main" +category = "dev" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" files = [ @@ -718,20 +718,6 @@ json = ["jsonschema", "pyrsistent", "pyrsistent (==0.16.1)", "requests"] protobuf = ["protobuf", "requests"] schema-registry = ["requests"] -[[package]] -name = "contextvars" -version = "2.4" -description = "PEP 567 Backport" -category = "main" -optional = false -python-versions = "*" -files = [ - {file = "contextvars-2.4.tar.gz", hash = "sha256:f38c908aaa59c14335eeea12abea5f443646216c4e29380d7bf34d2018e2c39e"}, -] - -[package.dependencies] -immutables = ">=0.9" - [[package]] name = "darglint" version = "1.8.1" @@ -748,7 +734,7 @@ files = [ name = "deprecation" version = "2.1.0" description = "A library to handle automated deprecations" -category = "main" +category = "dev" optional = false python-versions = "*" files = [ @@ -799,7 +785,7 @@ bcrypt = ["bcrypt"] name = "docker" version = "6.0.1" description = "A Python library for the Docker Engine API." -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -855,7 +841,7 @@ files = [ name = "exceptiongroup" version = "1.1.0" description = "Backport of PEP 654 (exception groups)" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1536,68 +1522,11 @@ files = [ {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] -[[package]] -name = "immutables" -version = "0.19" -description = "Immutable Collections" -category = "main" -optional = false -python-versions = ">=3.6" -files = [ - {file = "immutables-0.19-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:fef6743f8c3098ae46d9a2a3606b04a91c62e216487d91e90ce5c7419da3f803"}, - {file = "immutables-0.19-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cfb62119b7302a37cb4a1db44234dab9acda60ba93e3c28489969722e85237b7"}, - {file = "immutables-0.19-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1d55b886e92ef5abfc4b066f404d956ca5789a2f8f738d448300fba40930a631"}, - {file = "immutables-0.19-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:40f1c3ab3ae690a55a2f61039705a110f0e23717d6d8a62a84600fc7cf5934dc"}, - {file = "immutables-0.19-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:f3096afb376b9b3651a3b92affd1896b4dcefde209f412572f7e3924f6749a49"}, - {file = "immutables-0.19-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:85bcb5a7c33100c1b2eeb8c71e5f80acab4c9dde074b2c2ca8e3dfb6830ce813"}, - {file = "immutables-0.19-cp310-cp310-win_amd64.whl", hash = "sha256:620c166e76030ca4772ea64e5190f8347a730a0af85b743820d351f211004397"}, - {file = "immutables-0.19-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c1774f298db9d460e50c40dfc9cfe7dd8a0de22c22f1de9a1f9a468daa1201dc"}, - {file = "immutables-0.19-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:24dbdc28779a2b75e06224609f4fc850ba61b7e1b74e32ec808c6430a535be2d"}, - {file = "immutables-0.19-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b8c0a4264e3ba2f025f4517ce67f0d0869106a625dbda08758cbf4dd6b6dd1f"}, - {file = "immutables-0.19-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:28d1ee66424c2db998d27ebe0a331c7e09627e54a402848b2897cb6ef4dc4d7e"}, - {file = "immutables-0.19-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6f857aec0e0455986fd1f41234c867c3daf5a89ff7f54d493d4eb3c233d36d3c"}, - {file = "immutables-0.19-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:119c60a05cb35add45c1e592e23a5cbb9db03161bb89d1596b920d9341173982"}, - {file = "immutables-0.19-cp311-cp311-win_amd64.whl", hash = "sha256:3fbad255e404b4cbcf3477b384a1e400bd8f28cbbfc2df8d3885abe3bfc7b909"}, - {file = "immutables-0.19-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:6660e185354a1cb59ecc130f2b85b50d666d4417be668ce6ba83d4be79f55d34"}, - {file = "immutables-0.19-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:37de95c1d79707d95f50d0ab79e067bee52381afc967ff031ac4c822c14f43a8"}, - {file = "immutables-0.19-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ed61dbc963251bec7281cdb0c148176bbd70519d21fd05bce4c484632cdc3b2c"}, - {file = "immutables-0.19-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:7da9356a163993e01785a211b47c6a0038b48d1235b68479a0053c2c4c3cf666"}, - {file = "immutables-0.19-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:41d8cae52ea527f9c6dccdf1e1553106c482496acc140523034f91877ccbc103"}, - {file = "immutables-0.19-cp36-cp36m-win_amd64.whl", hash = "sha256:e95f0826f184920adb3cdf830f409f1c1d4e943e4dc50242538c4df9d51eea72"}, - {file = "immutables-0.19-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:50608784e33c88da8c0e06e75f6725865cf2e345c8f3eeb83cb85111f737e986"}, - {file = "immutables-0.19-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1cbd4d9dc531ee24b2387141a5968e923bb6174d13695e730cde0887aadda557"}, - {file = "immutables-0.19-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eed8988dc4ebde8d527dbe4dea68cb9fe6d43bc56df60d6015130dc4abd2ab34"}, - {file = "immutables-0.19-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:c830c9afc6fcb4a7d6d74230d6290987e664418026a15488ad00d8a3dc5ec743"}, - {file = "immutables-0.19-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7c6cce2e87cd5369234b199037631cfed08e43813a1fdd750807d14404de195b"}, - {file = "immutables-0.19-cp37-cp37m-win_amd64.whl", hash = "sha256:10774f73af07b1648fa02f45f6ff88b3391feda65d4f640159e6eeec10540ece"}, - {file = "immutables-0.19-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:a208a945ea817b1455b5b0f9c33c097baf6443b50d749a3dc32ff445e41b81d2"}, - {file = "immutables-0.19-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:25a6225efb5e96fc95d84b2d280e35d8a82a1ae72a12857177d48cc289ac1e03"}, - {file = "immutables-0.19-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c0cf0d94b08e58896acf250cbc4682499c8a256fc6d0ee5c63d76a759a6a228"}, - {file = "immutables-0.19-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:64c74c5171f3a97b178b880746743a07b08e7d7f6055370bf04a94d50aea0643"}, - {file = "immutables-0.19-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8ababf72ed2a956b28f151d605a7bb1d4e1c59113f53bf2be4a586da3977b319"}, - {file = "immutables-0.19-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:52a91917c65e6b9cfef7a2d2c3b0e00432a153aa8650785b7ee0897d80226278"}, - {file = "immutables-0.19-cp38-cp38-win_amd64.whl", hash = "sha256:bbe65c23779e12e0ecc3dec2c709ad22b7cc8b163895327bc173ae06a8b73425"}, - {file = "immutables-0.19-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:480cc5d62efcac66f9737ae0820acd39d39e516e6fdbcf46cbdc26f11b429fd7"}, - {file = "immutables-0.19-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2d88ff44e131508def4740964076c3da273baeeb406c1fe139f18373ea4196dd"}, - {file = "immutables-0.19-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7fa3148393101b0c4571da523929ae90a5b4bfc933c270a11b802a34a921c608"}, - {file = "immutables-0.19-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0575190a90c3fce6862ccdb09be3344741ff97a96e559893541886d372139f1c"}, - {file = "immutables-0.19-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:3754b26ef18b5d1009ffdeafc17fbd877a79f0a126e1423069bd8ef51c54302d"}, - {file = "immutables-0.19-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:648142e16d49f5207ae52ee1b28dfa148206471967b9c9eaa5a9592fd32d5cef"}, - {file = "immutables-0.19-cp39-cp39-win_amd64.whl", hash = "sha256:199db9070ffa1a037e6650ddd63159907a210e4998f932bdf50e70615629db0c"}, - {file = "immutables-0.19.tar.gz", hash = "sha256:df17942d60e8080835fcc5245aa6928ef4c1ed567570ec019185798195048dcf"}, -] - -[package.dependencies] -typing-extensions = {version = ">=3.7.4.3", markers = "python_version < \"3.8\""} - -[package.extras] -test = ["flake8 (>=5.0.4,<5.1.0)", "mypy (==0.971)", "pycodestyle (>=2.9.1,<2.10.0)", "pytest (>=6.2.4,<6.3.0)"] - [[package]] name = "importlib-metadata" version = "4.2.0" description = "Read metadata from Python packages" -category = "main" +category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1617,7 +1546,7 @@ testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pep517", name = "iniconfig" version = "2.0.0" description = "brain-dead simple config-ini parsing" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1655,6 +1584,21 @@ files = [ {file = "itsdangerous-2.1.2.tar.gz", hash = "sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a"}, ] +[[package]] +name = "janus" +version = "1.0.0" +description = "Mixed sync-async queue to interoperate between asyncio tasks and classic threads" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "janus-1.0.0-py3-none-any.whl", hash = "sha256:2596ea5482711c1ee3ef2df6c290aaf370a13c55a007826e8f7c32d696d1d00a"}, + {file = "janus-1.0.0.tar.gz", hash = "sha256:df976f2cdcfb034b147a2d51edfc34ff6bfb12d4e2643d3ad0e10de058cb1612"}, +] + +[package.dependencies] +typing-extensions = ">=3.7.4.3" + [[package]] name = "jinja2" version = "3.1.2" @@ -2114,7 +2058,7 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.2.2)", "pytest (>=7.2.1)", "pytes name = "pluggy" version = "1.0.0" description = "plugin and hook calling mechanisms for python" -category = "main" +category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -2674,7 +2618,7 @@ testing = ["coverage", "pytest (>=5.4.2)", "pytest-cov", "webtest (>=1.3.1)", "z name = "pytest" version = "7.2.1" description = "pytest: simple powerful testing with Python" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -2711,7 +2655,7 @@ files = [ name = "pywin32" version = "305" description = "Python for Window Extensions" -category = "main" +category = "dev" optional = false python-versions = "*" files = [ @@ -2735,7 +2679,7 @@ files = [ name = "pyyaml" version = "6.0" description = "YAML parser and emitter for Python" -category = "main" +category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -2963,7 +2907,7 @@ full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyam name = "testcontainers" version = "3.7.1" description = "Library provides lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -3016,7 +2960,7 @@ tornado = ["tornado (>=4.0,<6.0)"] name = "tomli" version = "2.0.1" description = "A lil' TOML parser" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -3290,7 +3234,7 @@ standard = ["PyYAML (>=5.1)", "colorama (>=0.4)", "httptools (>=0.2.0,<0.4.0)", name = "uvloop" version = "0.17.0" description = "Fast implementation of asyncio event loop on top of libuv" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -3402,7 +3346,7 @@ testing = ["coverage", "pytest (>=3.1.0)", "pytest-cov", "pytest-xdist"] name = "websocket-client" version = "1.5.1" description = "WebSocket client for Python with low level API options" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -3691,7 +3635,7 @@ typing-extensions = {version = ">=3.7.4", markers = "python_version < \"3.8\""} name = "zipp" version = "3.13.0" description = "Backport of pathlib-compatible object wrapper for zip files" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -3796,11 +3740,11 @@ test = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [extras] -all = ["requests", "kafka-python"] +all = ["kafka-python", "requests"] http = ["requests"] kafka = ["kafka-python"] [metadata] lock-version = "2.0" python-versions = ">=3.7, <3.12" -content-hash = "f56ff873cbffa8ce0007fcfdf195b00c243f6b458b2e46d67a7a4abd920bcb24" +content-hash = "cf358b3d144e2d2dddd2df5bf8f8121a680a3946233d77d86e4a3c6bb6a4de66" diff --git a/pyproject.toml b/pyproject.toml index 5833e70c..9f856936 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,8 @@ wrapt = '*' psutil = '*' requests = { version = ">=2.26.0", optional = true } kafka-python = { version = "*", optional = true } +janus = "^1.0.0" +uvloop = "^0.17.0" [tool.poetry.extras] all=[ diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index cad45fbb..fb90918c 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -19,15 +19,18 @@ import functools import os import sys +import asyncio from queue import Queue, Full from threading import Thread, Event from typing import TYPE_CHECKING, Optional +from uvloop import install as install_uvloop + from skywalking import config, plugins from skywalking import loggings from skywalking import meter from skywalking import profile -from skywalking.agent.protocol import Protocol +from skywalking.agent.protocol import Protocol, ProtocolAsync from skywalking.command import command_service from skywalking.loggings import logger from skywalking.profile.profile_task import ProfileTask @@ -39,6 +42,7 @@ if TYPE_CHECKING: from skywalking.trace.context import Segment +install_uvloop() def report_with_backoff(reporter_name, init_wait): """ @@ -66,6 +70,31 @@ def backoff_wrapper(self, *args, **kwargs): return backoff_decorator +def report_with_backoff_async(reporter_name, init_wait): + """ + An exponential async backoff for retrying reporters. + """ + + def backoff_decorator(func): + @functools.wraps(func) + async def backoff_wrapper(self, *args, **kwargs): + wait = base = init_wait + while not self._finished.is_set(): + try: + flag = await func(self, *args, **kwargs) + # for segment/log reporter, if the queue not empty(return True), we should keep reporter working + # for other cases(return false or None), reset to base wait time on success + wait = 0 if flag else base + except Exception: # noqa + wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum + logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' + f'retry in {wait} seconds') + await asyncio.sleep(wait) + logger.info('finished reporter coroutine') + + return backoff_wrapper + + return backoff_decorator class SkyWalkingAgent(Singleton): """ @@ -233,7 +262,7 @@ def start(self) -> None: config.finalize() # Must be finalized exactly once self.__started = True - logger.info(f'SkyWalking agent instance {config.agent_instance_name} starting in pid-{os.getpid()}.') + logger.info(f'SkyWalking sync agent instance {config.agent_instance_name} starting in pid-{os.getpid()}.') # Install log reporter core # TODO - Add support for printing traceID/ context in logs @@ -382,7 +411,268 @@ def notify_profile_finish(self, task: ProfileTask): logger.error(f'notify profile task finish to backend fail. {str(e)}') +class SkyWalkingAgentAsync(Singleton): + __started: bool = False # shared by all instances + + def __init__(self): + """ + ProtocolAsync is one of gRPC, HTTP and Kafka that + provides async clients to reporters to communicate with OAP backend. + """ + self.started_pid = None + self.__protocol: Optional[ProtocolAsync] = None + self._finished: Optional[asyncio.Event] = None + + def __bootstrap(self): + if config.agent_protocol == 'grpc': + from skywalking.agent.protocol.grpc_aio import GrpcProtocolAsync + self.__protocol = GrpcProtocolAsync() + else: + # TODO: support other protocols + raise ValueError(f'Unsupported protocol {config.agent_protocol}') + logger.info(f'You are using {config.agent_protocol} protocol to communicate with OAP backend') + + # Initialize asyncio queues for segment, log, meter and profiling snapshots + self.__segment_queue: Optional[asyncio.Queue] = None + self.__log_queue: Optional[asyncio.Queue] = None + self.__meter_queue: Optional[asyncio.Queue] = None + self.__snapshot_queue: Optional[asyncio.Queue] = None + + self.event_loop_thread: Optional[Thread] = None + # # According to https://github.com/python/cpython/issues/91887 + # # creat_task() only keeps a weak reference to the asyncio.Task, + # # To avoid the task being garbage collected, we need to keep a strong reference to it. + # # Such as store it in a set. + # self.tasks_ref_set: set[asyncio.Task] = set() + self.loop = asyncio.get_event_loop() + self._finished = asyncio.Event() + + # Start reporter's asyncio coroutines and register queues + self.__init_coroutine() + + def __init_coroutine(self) -> None: + """ + This method initializes all the queues and asyncio tasks for the agent and reporters. + + Heartbeat task is started by default. + Segment reporter task and segment queue is created by default. + All other queues and tasks depends on user configuration. + """ + + self.background_coroutines = set() + + __heartbeat_coroutine = self.__heartbeat() + self.background_coroutines.add(__heartbeat_coroutine) + + self.__segment_queue = asyncio.Queue(maxsize=config.agent_trace_reporter_max_buffer_size) + __segment_report_coroutine = self.__report_segment() + self.background_coroutines.add(__segment_report_coroutine) + + if config.agent_meter_reporter_active: + self.__meter_queue = asyncio.Queue(maxsize=config.agent_meter_reporter_max_buffer_size) + __meter_report_coroutine = self.__report_meter() + self.background_coroutines.add(__meter_report_coroutine) + + if config.agent_pvm_meter_reporter_active: + from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource + from skywalking.meter.pvm.gc_data import GCDataSource + from skywalking.meter.pvm.mem_usage import MEMUsageDataSource + from skywalking.meter.pvm.thread_data import ThreadDataSource + + # still use thread to collect pvm meter + MEMUsageDataSource().register() + CPUUsageDataSource().register() + GCDataSource().register() + ThreadDataSource().register() + + if config.agent_log_reporter_active: + self.__log_queue = asyncio.Queue(maxsize=config.agent_log_reporter_max_buffer_size) + __log_report_coroutine = self.__report_log() + self.background_coroutines.add(__log_report_coroutine) + + if config.agent_profile_active: + # TODO: support profile + ... + + def __start_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: + # do not use asyncio.run() here, because we may want loop to be used in other places + # such as run_coroutine_threadsafe() + asyncio.set_event_loop(loop) + # run all coroutines + try: + loop.run_until_complete(asyncio.wait(self.background_coroutines, return_when=asyncio.ALL_COMPLETED)) + except asyncio.CancelledError: + # this is for handling KeyboardInterrupt(Ctrl + C) + # for better solution, use Python 3.11+ + logger.info('Force the cancellation of event_loop tasks') + finally: + loop.close() + + def start(self) -> None: + loggings.init() + + if sys.version_info < (3, 7): + # agent may or may not work for Python 3.6 and below + # since 3.6 is EOL, we will not officially support it + logger.warning('SkyWalking Python agent does not support Python 3.6 and below, ' + 'please upgrade to Python 3.7 or above.') + + if not self.__started: + # if not already started, start the agent + config.finalize() # Must be finalized exactly once + + self.__started = True + logger.info(f'SkyWalking async agent instance {config.agent_instance_name} starting in pid-{os.getpid()}.') + + # Install log reporter core + # TODO - Add support for printing traceID/ context in logs + if config.agent_log_reporter_active: + from skywalking import log + log.install() + # Here we install all other lib plugins on first time start (parent process) + plugins.install() + elif self.__started and os.getpid() == self.started_pid: + # if already started, and this is the same process, raise an error + raise RuntimeError('SkyWalking Python agent has already been started in this process, ' + 'did you call start more than once in your code + sw-python CLI? ' + 'If you already use sw-python CLI, you should remove the manual start(), vice versa.') + + self.started_pid = os.getpid() + + if config.agent_profile_active: + # TODO: support profile + ... + # profile.init() + if config.agent_meter_reporter_active: + meter.init(force=True) + + self.__bootstrap() # gather all coroutines + + atexit.register(self.__fini) + + # can use asyncio.to_thread() in Python 3.9+ + self.event_loop_thread = Thread(name='event_loop_thread', target=self.__start_event_loop, args=(self.loop,), daemon=True) + self.event_loop_thread.start() + + async def __fini_async(self): + """ + This method is called when the agent is shutting down. + Clean up all the queues and stop all the asyncio tasks. + """ + self._finished.set() + queue_join_coroutine_list = [self.__segment_queue.join()] + + if config.agent_log_reporter_active: + queue_join_coroutine_list.append(self.__log_queue.join()) + + if config.agent_profile_active: + # TODO: support profile + ... + # queue_join_coroutine_list.append(self.__snapshot_queue.join()) + + if config.agent_meter_reporter_active: + queue_join_coroutine_list.append(self.__meter_queue.join()) + + await asyncio.gather(*queue_join_coroutine_list, return_exceptions=True) # clean queues + # cancel all tasks + all_tasks = asyncio.all_tasks(self.loop) + for task in all_tasks: + task.cancel() + + def __fini(self): + asyncio.run_coroutine_threadsafe(self.__fini_async(), self.loop) + if not self.loop.is_closed(): + self.loop.close() + self.event_loop_thread.join() + logger.info('Finished Python agent event_loop thread') + + def stop(self) -> None: + """ + Stops the agent and reset the started flag. + """ + atexit.unregister(self.__fini) + self.__fini() + self.__started = False + + @report_with_backoff_async(reporter_name='heartbeat', init_wait=config.agent_collector_heartbeat_period) + async def __heartbeat(self) -> None: + await self.__protocol.heartbeat() + + @report_with_backoff_async(reporter_name='segment', init_wait=0.02) + async def __report_segment(self) -> bool: + """Returns True if the queue is not empty""" + queue_not_empty_flag = not self.__segment_queue.empty() + if queue_not_empty_flag: + await self.__protocol.report_segment(self.__segment_queue) + return queue_not_empty_flag + + @report_with_backoff_async(reporter_name='log', init_wait=0.02) + async def __report_log(self) -> bool: + """Returns True if the queue is not empty""" + queue_not_empty_flag = not self.__log_queue.empty() + if queue_not_empty_flag: + await self.__protocol.report_log(self.__log_queue) + return queue_not_empty_flag + + @report_with_backoff_async(reporter_name='meter', init_wait=config.agent_meter_reporter_period) + async def __report_meter(self) -> None: + if not self.__meter_queue.empty(): + await self.__protocol.report_meter(self.__meter_queue) + + def __send_profile_snapshot(self) -> None: + # TODO: support profile + ... + + def __query_profile_command(self) -> None: + # TODO: support profile + ... + + @staticmethod + def __command_dispatch() -> None: + # command dispatch will stuck when there are no commands + command_service.dispatch() + + def is_segment_queue_full(self): + return self.__segment_queue.full() + + def archive_segment(self, segment: 'Segment'): + try: + self.__segment_queue.put_nowait(segment) + # asyncio.run_coroutine_threadsafe(self.__segment_queue.put(segment), self.loop) + except asyncio.QueueFull: + logger.warning('the queue is full, the segment will be abandoned') + + def archive_log(self, log_data: 'LogData'): + try: + self.__log_queue.put_nowait(log_data) + # asyncio.run_coroutine_threadsafe(self.__log_queue.put(log_data), self.loop) + except asyncio.QueueFull: + logger.warning('the queue is full, the log will be abandoned') + + def archive_meter(self, meter_data: 'MeterData'): + try: + self.__meter_queue.put_nowait(meter_data) + # asyncio.run_coroutine_threadsafe(self.__meter_queue.put(meter_data), self.loop) + except asyncio.QueueFull: + logger.warning('the queue is full, the meter will be abandoned') + + def add_profiling_snapshot_async(self, snapshot: TracingThreadSnapshot): + # TODO: support profile + ... + # try: + # self.__snapshot_queue.put_nowait(snapshot) + # except asyncio.QueueFull: + # logger.warning('the snapshot queue is full, the snapshot will be abandoned') + + def notify_profile_finish_async(self, task: ProfileTask): + # TODO: support profile + ... + # try: + # asyncio.run_coroutine_threadsafe(self.__protocol.notify_profile_task_finish(task), self.loop) + # except Exception as e: + # logger.error(f'notify profile task finish to backend fail. {e}') + # Export for user (backwards compatibility) # so users still use `from skywalking import agent` -agent = SkyWalkingAgent() +agent = SkyWalkingAgentAsync() if config.agent_asyncio_enhancement else SkyWalkingAgent() start = agent.start diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py index bf6b4270..1cff6713 100644 --- a/skywalking/agent/protocol/__init__.py +++ b/skywalking/agent/protocol/__init__.py @@ -17,6 +17,7 @@ from abc import ABC, abstractmethod from queue import Queue +from asyncio import Queue as QueueAsync class Protocol(ABC): @@ -47,3 +48,32 @@ def query_profile_commands(self): @abstractmethod def notify_profile_task_finish(self, task): raise NotImplementedError() + +class ProtocolAsync(ABC): + @abstractmethod + async def heartbeat(self): + raise NotImplementedError() + + @abstractmethod + async def report_segment(self, queue: QueueAsync, block: bool = True): + raise NotImplementedError() + + @abstractmethod + async def report_log(self, queue: QueueAsync, block: bool = True): + raise NotImplementedError() + + @abstractmethod + async def report_meter(self, queue: QueueAsync, block: bool = True): + raise NotImplementedError() + + @abstractmethod + async def report_snapshot(self, queue: QueueAsync, block: bool = True): + raise NotImplementedError() + + @abstractmethod + async def query_profile_commands(self): + raise NotImplementedError() + + @abstractmethod + async def notify_profile_task_finish(self, task): + raise NotImplementedError() diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 52b2a0c9..b70240ee 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -23,7 +23,7 @@ import grpc from skywalking import config -from skywalking.agent import Protocol +from skywalking.agent.protocol import Protocol from skywalking.agent.protocol.interceptors import header_adder_interceptor from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, \ GrpcProfileTaskChannelService, GrpcLogDataReportService, GrpcMeterReportService diff --git a/skywalking/agent/protocol/grpc_aio.py b/skywalking/agent/protocol/grpc_aio.py new file mode 100644 index 00000000..973389a7 --- /dev/null +++ b/skywalking/agent/protocol/grpc_aio.py @@ -0,0 +1,262 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import traceback +from asyncio import Queue, QueueEmpty, wait_for +from time import time + +import grpc + +from skywalking import config +from skywalking.agent.protocol import ProtocolAsync +from skywalking.agent.protocol.interceptors_aio import header_adder_interceptor_async +from skywalking.client.grpc_aio import GrpcServiceManagementClientAsync, GrpcTraceSegmentReportServiceAsync, \ + GrpcProfileTaskChannelServiceAsync, GrpcLogReportServiceAsync, GrpcMeterReportServiceAsync +from skywalking.loggings import logger, logger_debug_enabled +from skywalking.profile.profile_task import ProfileTask +from skywalking.profile.snapshot import TracingThreadSnapshot +from skywalking.protocol.common.Common_pb2 import KeyStringValuePair +from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference +from skywalking.protocol.logging.Logging_pb2 import LogData +from skywalking.protocol.language_agent.Meter_pb2 import MeterData +from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack +from skywalking.trace.segment import Segment + + +class GrpcProtocolAsync(ProtocolAsync): + """ + grpc for asyncio + """ + def __init__(self): + self.properties_sent = False + + # grpc.aio.channel do not have subscribe() method to set a callback when channel state changed + # instead, it has a get_state() method to get the current state of the channel + # consider the channel state is only used for debug, the cost of monitoring this value is too high. + # self.state = None + + interceptors = [header_adder_interceptor_async('authentication', config.agent_authentication)] \ + if config.agent_authentication else None + + if config.agent_force_tls: + self.channel = grpc.aio.secure_channel(config.agent_collector_backend_services, + grpc.ssl_channel_credentials(), interceptors=interceptors) + else: + self.channel = grpc.aio.insecure_channel(config.agent_collector_backend_services, + interceptors=interceptors) + + self.service_management = GrpcServiceManagementClientAsync(self.channel) + self.traces_reporter = GrpcTraceSegmentReportServiceAsync(self.channel) + self.log_reporter = GrpcLogReportServiceAsync(self.channel) + self.meter_reporter = GrpcMeterReportServiceAsync(self.channel) + self.profile_channel = GrpcProfileTaskChannelServiceAsync(self.channel) + + async def query_profile_commands(self): + ... + + async def notify_profile_task_finish(self, task: ProfileTask): + ... + + async def heartbeat(self): + try: + if not self.properties_sent: + await self.service_management.send_instance_props() + self.properties_sent = True + + await self.service_management.send_heart_beat() + + except grpc.aio.AioRpcError: + self.on_error() + raise + + def on_error(self): + traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None + + async def report_segment(self, queue: Queue, block: bool = True): + start = None + + async def generator(): + nonlocal start + + while True: + # try: + # timeout = config.agent_queue_timeout # type: int + # if not start: # make sure first time through queue is always checked + # start = time() + # else: + # timeout -= int(time() - start) + # if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + # return + # segment = await wait_for(queue.get(), timeout) if block else queue.get_nowait() # type: Segment + # except QueueEmpty or TimeoutError: + # return + segment = await queue.get() + + queue.task_done() + + if logger_debug_enabled: + logger.debug('reporting segment %s', segment) + + s = SegmentObject( + traceId=str(segment.related_traces[0]), + traceSegmentId=str(segment.segment_id), + service=config.agent_name, + serviceInstance=config.agent_instance_name, + spans=[SpanObject( + spanId=span.sid, + parentSpanId=span.pid, + startTime=span.start_time, + endTime=span.end_time, + operationName=span.op, + peer=span.peer, + spanType=span.kind.name, + spanLayer=span.layer.name, + componentId=span.component.value, + isError=span.error_occurred, + logs=[Log( + time=int(log.timestamp * 1000), + data=[KeyStringValuePair(key=item.key, value=item.val) for item in log.items], + ) for log in span.logs], + tags=[KeyStringValuePair( + key=tag.key, + value=tag.val, + ) for tag in span.iter_tags()], + refs=[SegmentReference( + refType=0 if ref.ref_type == 'CrossProcess' else 1, + traceId=ref.trace_id, + parentTraceSegmentId=ref.segment_id, + parentSpanId=ref.span_id, + parentService=ref.service, + parentServiceInstance=ref.service_instance, + parentEndpoint=ref.endpoint, + networkAddressUsedAtPeer=ref.client_address, + ) for ref in span.refs if ref.trace_id], + ) for span in segment.spans], + ) + + yield s + + try: + await self.traces_reporter.report(generator()) + except grpc.aio.AioRpcError: + self.on_error() + raise # reraise so that incremental reconnect wait can process + + async def report_log(self, queue: Queue, block: bool = True): + start = None + + async def generator(): + nonlocal start + + while True: + # try: + # timeout = config.agent_queue_timeout # type: int + # if not start: # make sure first time through queue is always checked + # start = time() + # else: + # timeout -= int(time() - start) + # if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + # return + # log_data = await wait_for(queue.get(), timeout) if block else queue.get_nowait() # type: LogData + # except QueueEmpty or TimeoutError: + # return + log_data = await queue.get() + + queue.task_done() + + if logger_debug_enabled: + logger.debug('Reporting Log %s', log_data.timestamp) + + yield log_data + + try: + await self.log_reporter.report(generator()) + except grpc.aio.AioRpcError: + self.on_error() + raise + + async def report_meter(self, queue: Queue, block: bool = True): + start = None + + async def generator(): + nonlocal start + + while True: + # try: + # timeout = config.agent_queue_timeout # type: int + # if not start: # make sure first time through queue is always checked + # start = time() + # else: + # timeout -= int(time() - start) + # if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + # return + # meter_data = await wait_for(queue.get(), timeout) if block else queue.get_nowait() # type: MeterData + # except QueueEmpty or TimeoutError: + # return + meter_data = await queue.get() + + queue.task_done() + + if logger_debug_enabled: + logger.debug('Reporting Meter %s', meter_data.timestamp) + + yield meter_data + + try: + await self.meter_reporter.report(generator()) + except grpc.aio.AioRpcError: + self.on_error() + raise + + async def report_snapshot(self, queue: Queue, block: bool = True): + start = None + + async def generator(): + nonlocal start + + while True: + # try: + # timeout = config.agent_queue_timeout # type: int + # if not start: # make sure first time through queue is always checked + # start = time() + # else: + # timeout -= int(time() - start) + # if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + # return + # snapshot = await wait_for(queue.get(), timeout) if block else queue.get_nowait() # type: TracingThreadSnapshot + # except QueueEmpty or TimeoutError: + # return + snapshot = await queue.get() + + queue.task_done() + + transform_snapshot = ThreadSnapshot( + taskId=str(snapshot.task_id), + traceSegmentId=str(snapshot.trace_segment_id), + time=int(snapshot.time), + sequence=int(snapshot.sequence), + stack=ThreadStack(codeSignatures=snapshot.stack_list) + ) + + yield transform_snapshot + + try: + await self.profile_channel.report(generator()) + except grpc.aio.AioRpcError: + self.on_error() + raise diff --git a/skywalking/agent/protocol/interceptors_aio.py b/skywalking/agent/protocol/interceptors_aio.py new file mode 100644 index 00000000..1ade5fb7 --- /dev/null +++ b/skywalking/agent/protocol/interceptors_aio.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from collections import namedtuple + +import grpc + + +class _ClientInterceptorAsync( + grpc.aio.UnaryUnaryClientInterceptor, + grpc.aio.UnaryStreamClientInterceptor, + grpc.aio.StreamUnaryClientInterceptor, + grpc.aio.StreamStreamClientInterceptor +): + + def __init__(self, interceptor_async_function): + self._fn = interceptor_async_function + + async def intercept_unary_unary(self, continuation, client_call_details, request): + new_details, new_request_iterator, postprocess = await \ + self._fn(client_call_details, iter((request,)), False, False) + response = await continuation(new_details, next(new_request_iterator)) + return (await postprocess(response)) if postprocess else response + + async def intercept_unary_stream(self, continuation, client_call_details, request): + new_details, new_request_iterator, postprocess = await \ + self._fn(client_call_details, iter((request,)), False, True) + response_it = await continuation(new_details, next(new_request_iterator)) + return (await postprocess(response_it)) if postprocess else response_it + + async def intercept_stream_unary(self, continuation, client_call_details, request_iterator): + new_details, new_request_iterator, postprocess = await \ + self._fn(client_call_details, request_iterator, True, False) + response = await continuation(new_details, new_request_iterator) + return (await postprocess(response)) if postprocess else response + + async def intercept_stream_stream(self, continuation, client_call_details, request_iterator): + new_details, new_request_iterator, postprocess = await \ + self._fn(client_call_details, request_iterator, True, True) + response_it = await continuation(new_details, new_request_iterator) + return (await postprocess(response_it)) if postprocess else response_it + + +def create(intercept_async_call): + return _ClientInterceptorAsync(intercept_async_call) + + +ClientCallDetails = namedtuple('ClientCallDetails', ('method', 'timeout', 'metadata', 'credentials')) + + +def header_adder_interceptor_async(header, value): + async def intercept_async_call(client_call_details, request_iterator, request_streaming, response_streaming): + metadata = list(client_call_details.metadata or ()) + metadata.append((header, value)) + client_call_details = ClientCallDetails( + client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials, + ) + return client_call_details, request_iterator, None + + return create(intercept_async_call) diff --git a/skywalking/bootstrap/loader/sitecustomize.py b/skywalking/bootstrap/loader/sitecustomize.py index a79ba252..bf3ba74b 100644 --- a/skywalking/bootstrap/loader/sitecustomize.py +++ b/skywalking/bootstrap/loader/sitecustomize.py @@ -130,7 +130,8 @@ def _get_sw_loader_logger(): os._exit(1) # noqa: do not go further else: - from skywalking import agent, config + from skywalking import config + from skywalking.agent import agent _sw_loader_logger.info(f'Process-{os.getpid()}, running sitecustomize.py from {__file__}') # also override debug for skywalking agent itself diff --git a/skywalking/client/__init__.py b/skywalking/client/__init__.py index d9f12ad1..189b1877 100644 --- a/skywalking/client/__init__.py +++ b/skywalking/client/__init__.py @@ -134,3 +134,114 @@ def do_query(self): @abstractmethod def report(self, generator): raise NotImplementedError() + +# Asyncio Implementation + +class ServiceManagementClientAsync(ABC): + """ + Used to register service and instance to OAP, for Asyncio. + """ + + def __init__(self): + self.sent_properties_counter = 0 + + @abstractmethod + async def send_instance_props(self) -> None: + """ + Unique to each protocol, send instance properties to OAP. + """ + raise NotImplementedError() + + async def refresh_instance_props(self) -> None: + """ + Periodically refresh the instance properties to prevent loss on OAP TTL records expiration. + Default: 30 * 10 seconds + """ + self.sent_properties_counter += 1 + if self.sent_properties_counter % config.agent_collector_properties_report_period_factor == 0: + await self.send_instance_props() + + @staticmethod + def get_instance_properties() -> List[dict]: + """ + Get current running Python interpreter's system properties. + Returns: [{'key': str, 'value': str}, ...] + """ + try: + properties = [ + {'key': 'language', 'value': 'python'}, + {'key': 'OS Name', 'value': os.name}, + {'key': 'Process No.', 'value': str(os.getpid())}, + {'key': 'hostname', 'value': socket.gethostname()}, + {'key': 'ipv4', 'value': '; '.join(socket.gethostbyname_ex(socket.gethostname())[2])}, + {'key': 'python_implementation', 'value': platform.python_implementation()}, + {'key': 'python_version', 'value': platform.python_version()}, + ] + + except Exception as e: # noqa + logger.exception('Failed to get OS info, fallback to basic properties.') + properties = [ + {'key': 'language', 'value': 'python'}, + {'key': 'Process No.', 'value': str(os.getpid())}, + ] + + namespace = config.agent_namespace + if namespace: + properties.append({'key': 'namespace', 'value': namespace}) + + instance_properties_json = config.agent_instance_properties_json + if instance_properties_json: + # load instance properties from json string + json_properties = json.loads(instance_properties_json) + for key, value in json_properties.items(): + properties.append({'key': key, 'value': value}) + + return properties + + def get_instance_properties_proto(self) -> List[KeyStringValuePair]: + """ + Converts to protobuf format. + Returns: [KeyStringValuePair, ...] + """ + return [KeyStringValuePair(key=prop['key'], value=prop['value']) for prop in self.get_instance_properties()] + + async def send_heart_beat(self) -> None: + """ + Each protocol must implement this method to send heart beat to OAP. + Returns: None + """ + raise NotImplementedError() + + +class ServiceAsync(ABC): + @abstractmethod + async def report(self, segment: bytes) -> None: + raise NotImplementedError() + + +class TraceSegmentReportServiceAsync(ServiceAsync): + @abstractmethod + async def report(self, generator): + raise NotImplementedError() + + +class MeterReportServiceAsync(ServiceAsync): + @abstractmethod + async def report(self, generator): + raise NotImplementedError() + + +class LogDataReportServiceAsync(ServiceAsync): + @abstractmethod + async def report(self, generator): + raise NotImplementedError() + + +class ProfileTaskChannelServiceAsync(ServiceAsync): + @abstractmethod + async def do_query(self): + raise NotImplementedError() + + @abstractmethod + async def report(self, generator): + raise NotImplementedError() diff --git a/skywalking/client/grpc_aio.py b/skywalking/client/grpc_aio.py new file mode 100644 index 00000000..818b9600 --- /dev/null +++ b/skywalking/client/grpc_aio.py @@ -0,0 +1,123 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import grpc + +from skywalking import config +from skywalking.client import ServiceManagementClientAsync, TraceSegmentReportServiceAsync, \ + ProfileTaskChannelServiceAsync, LogDataReportServiceAsync, MeterReportServiceAsync +from skywalking.command import command_service +from skywalking.loggings import logger, logger_debug_enabled +from skywalking.profile import profile_task_execution_service +from skywalking.profile.profile_task import ProfileTask +from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub +from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub +from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties +from skywalking.protocol.language_agent.Meter_pb2_grpc import MeterReportServiceStub +from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub +from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery, ProfileTaskFinishReport +from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub + +class GrpcServiceManagementClientAsync(ServiceManagementClientAsync): + def __init__(self, channel: grpc.aio.Channel): + super().__init__() + self.instance_properties = self.get_instance_properties_proto() + self.service_stub = ManagementServiceStub(channel) + + async def send_instance_props(self): + await self.service_stub.reportInstanceProperties(InstanceProperties( + service=config.agent_name, + serviceInstance=config.agent_instance_name, + properties=self.instance_properties, + )) + + async def send_heart_beat(self): + await self.refresh_instance_props() + + await self.service_stub.keepAlive(InstancePingPkg( + service=config.agent_name, + serviceInstance=config.agent_instance_name, + )) + + if logger_debug_enabled: + logger.debug( + 'service heart beats, [%s], [%s]', + config.agent_name, + config.agent_instance_name, + ) + + +class GrpcTraceSegmentReportServiceAsync(TraceSegmentReportServiceAsync): + def __init__(self, channel: grpc.aio.Channel): + super().__init__() + self.report_stub = TraceSegmentReportServiceStub(channel) + + async def report(self, generator): + await self.report_stub.collect(generator) + + +class GrpcMeterReportServiceAsync(MeterReportServiceAsync): + def __init__(self, channel: grpc.aio.Channel): + super().__init__() + self.report_stub = MeterReportServiceStub(channel) + + async def report_batch(self, generator): + await self.report_stub.collectBatch(generator) + + async def report(self, generator): + await self.report_stub.collect(generator) + + +class GrpcLogReportServiceAsync(LogDataReportServiceAsync): + def __init__(self, channel: grpc.aio.Channel): + super().__init__() + self.report_stub = LogReportServiceStub(channel) + + async def report(self, generator): + await self.report_stub.collect(generator) + + +class GrpcProfileTaskChannelServiceAsync(ProfileTaskChannelServiceAsync): + def __init__(self, channel: grpc.Channel): + self.profile_stub = ProfileTaskStub(channel) + + async def do_query(self): + """ + Temporarily unavailable + + TODO: Adapt asyncio + """ + query = ProfileTaskCommandQuery( + service=config.agent_name, + serviceInstance=config.agent_instance_name, + lastCommandTime=profile_task_execution_service.get_last_command_create_time() + ) + + # TODO: Async style + # commands = self.profile_stub.getProfileTaskCommands(query) + # command_service.receive_command(commands) + + async def report(self, generator): + await self.profile_stub.collectSnapshot(generator) + + async def finish(self, task: ProfileTask): + finish_report = ProfileTaskFinishReport( + service=config.agent_name, + serviceInstance=config.agent_instance_name, + taskId=task.task_id + ) + await self.profile_stub.reportTaskFinish(finish_report) \ No newline at end of file diff --git a/skywalking/config.py b/skywalking/config.py index 38f92b39..ce23f4a5 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -101,6 +101,10 @@ # DANGEROUS - This option controls the interval of each bulk report from telemetry data queues # Do not modify unless you have evaluated its impact given your service load. agent_queue_timeout: int = int(os.getenv('SW_AGENT_QUEUE_TIMEOUT', '1')) +# Replace the threads to asyncio coroutines in network IO task with the OAP +# This option is experimental and may not work as expected. +# Profile data is temporarily unavailable to sent to the OAP if this option is enabled. +agent_asyncio_enhancement: bool = os.getenv('SW_AGENT_ASYNCIO_ENHANCEMENT', '').lower() == 'true' # BEGIN: SW_PYTHON Auto Instrumentation CLI # Special: can only be passed via environment. This config controls the child process agent bootstrap behavior in @@ -238,7 +242,18 @@ def finalize_feature() -> None: """ Examine reporter configuration and warn users about the incompatibility of protocol vs features """ - global agent_profile_active, agent_meter_reporter_active + global agent_profile_active, agent_meter_reporter_active, agent_protocol + + if agent_asyncio_enhancement: + if agent_protocol != 'grpc': + agent_protocol = 'grpc' + warnings.warn('Asyncio enhancement temporarily only works with gRPC protocol, please use gRPC protocol if you would \ + like to use this feature.') + + if agent_profile_active: + agent_profile_active = False + warnings.warn('Asyncio enhancement temporarily does not work with profiler, please disable profiler if you would \ + like to use this feature.') if agent_protocol == 'http' and (agent_profile_active or agent_meter_reporter_active): agent_profile_active = False From c4d5b398541d3d1f43d0c50d31ccae90fe1909af Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Mon, 15 May 2023 07:56:05 +0000 Subject: [PATCH 2/2] test --- bench_fastapi.py | 22 +++++++++++ docker-compose.yaml | 89 +++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++ wrk | 1 + 4 files changed, 116 insertions(+) create mode 100644 bench_fastapi.py create mode 100644 docker-compose.yaml create mode 100644 requirements.txt create mode 160000 wrk diff --git a/bench_fastapi.py b/bench_fastapi.py new file mode 100644 index 00000000..9904a15e --- /dev/null +++ b/bench_fastapi.py @@ -0,0 +1,22 @@ +from skywalking import agent, config + +config.init(agent_collector_backend_services='localhost:11800') + +agent.start() +from flask import Flask, jsonify +import logging + +app = Flask(__name__) + +# benchmarking: sw-python run python flask_single.py + +@app.route('/cat', methods=['POST', 'GET']) +def cat(): + try: + logging.critical('fun cat got a request') + return jsonify({'Cat Fun Fact': 'Fact is cat, cat is fat'}) + except Exception as e: # noqa + return jsonify({'message': str(e)}) + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=9999, debug=False, use_reloader=False) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 00000000..3c637511 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,89 @@ +services: + oap: + container_name: oap + image: apache/skywalking-oap-server:9.2.0 + # Python agent supports gRPC/ HTTP/ Kafka reporting + expose: + - 11800 # gRPC + - 12800 # HTTP + networks: + - manual + # environment: + # # SW_KAFKA_FETCHER: default + # # SW_KAFKA_FETCHER_SERVERS: kafka:9092 + # # SW_KAFKA_FETCHER_PARTITIONS: 2 + # # SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1 + healthcheck: + test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/11800" ] + interval: 5s + timeout: 100s + retries: 120 + ports: + - "12800:12800" + - "11800:11800" + # depends_on: + # - kafka + + + ui: + image: apache/skywalking-ui:9.2.0 + container_name: ui + depends_on: + oap: + condition: service_healthy + networks: + - manual + ports: + - "8080:8080" + environment: + SW_OAP_ADDRESS: "http://oap:12800" + + # zookeeper: + # container_name: zk + # image: confluentinc/cp-zookeeper:latest + # ports: + # - "2181:2181" + # environment: + # ZOOKEEPER_CLIENT_PORT: 2181 + # ZOOKEEPER_TICK_TIME: 2000 + # networks: + # - manual + + # kafka: + # container_name: kafka + # image: confluentinc/cp-kafka + # expose: + # - 9092 + # - 9094 + # ports: + # - 9092:9092 + # - 9094:9094 + # depends_on: + # - zookeeper + # environment: + # KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + # KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094 + # KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094 + # KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT + # KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + # KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + # networks: + # - manual + + + # kafka-ui: + # image: provectuslabs/kafka-ui + # container_name: kafka-ui + # ports: + # - "8088:8080" + # restart: always + # environment: + # - KAFKA_CLUSTERS_0_NAME=local + # - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 + # depends_on: + # - kafka + # networks: + # - manual + +networks: + manual: \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..097c894c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +uvicorn +fastapi +flask +uvloop \ No newline at end of file diff --git a/wrk b/wrk new file mode 160000 index 00000000..a211dd5a --- /dev/null +++ b/wrk @@ -0,0 +1 @@ +Subproject commit a211dd5a7050b1f9e8a9870b95513060e72ac4a0