diff --git a/dubbo/__version__.py b/dubbo/__version__.py index aeae1de..7302839 100644 --- a/dubbo/__version__.py +++ b/dubbo/__version__.py @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.0.0b1" +__version__ = "3.0.0b1" diff --git a/dubbo/cluster/failfast_cluster.py b/dubbo/cluster/failfast_cluster.py index 8bfe47a..ee5497e 100644 --- a/dubbo/cluster/failfast_cluster.py +++ b/dubbo/cluster/failfast_cluster.py @@ -15,6 +15,7 @@ # limitations under the License. from dubbo.cluster import Cluster, Directory, LoadBalance +from dubbo.cluster.loadbalances import CpuLoadBalance from dubbo.constants import common_constants from dubbo.extension import extensionLoader from dubbo.protocol import Invoker, Result @@ -27,13 +28,16 @@ class FailfastInvoker(Invoker): FailfastInvoker """ - def __init__(self, directory: Directory, url: URL): + def __init__(self, directory, url: URL): self._directory = directory self._load_balance = extensionLoader.get_extension( LoadBalance, url.parameters.get(common_constants.LOADBALANCE_KEY, "random") )() + if isinstance(self._load_balance, CpuLoadBalance): + self._load_balance.set_monitor(directory) + def invoke(self, invocation) -> Result: # get the invokers diff --git a/dubbo/cluster/loadbalances.py b/dubbo/cluster/loadbalances.py index 4b6f0b3..94e2e54 100644 --- a/dubbo/cluster/loadbalances.py +++ b/dubbo/cluster/loadbalances.py @@ -18,6 +18,7 @@ from typing import List, Optional from dubbo.cluster import LoadBalance +from dubbo.cluster.monitor.cpu import CpuMonitor from dubbo.protocol import Invocation, Invoker @@ -63,3 +64,46 @@ def do_select( ) -> Optional[Invoker]: randint = random.randint(0, len(invokers) - 1) return invokers[randint] + + +class CpuLoadBalance(AbstractLoadBalance): + """ + CPU load balance. + """ + + def __init__(self): + self._monitor: Optional[CpuMonitor] = None + + def set_monitor(self, monitor: CpuMonitor) -> None: + """ + Set the CPU monitor. + :param monitor: The CPU monitor. + :type monitor: CpuMonitor + """ + self._monitor = monitor + + def do_select( + self, invokers: List[Invoker], invocation: Invocation + ) -> Optional[Invoker]: + # get the CPU usage + cpu_usages = self._monitor.get_cpu_usage(invokers) + # Select the caller with the lowest CPU usage, 0 means CPU usage is unknown. + available_invokers = [] + unknown_invokers = [] + + for invoker, cpu_usage in cpu_usages.items(): + if cpu_usage == 0: + unknown_invokers.append((cpu_usage, invoker)) + else: + available_invokers.append((cpu_usage, invoker)) + + if available_invokers: + # sort and select the invoker with the lowest CPU usage + available_invokers.sort(key=lambda x: x[0]) + return available_invokers[0][1] + elif unknown_invokers: + # get the invoker with unknown CPU usage randomly + randint = random.randint(0, len(unknown_invokers) - 1) + return unknown_invokers[randint][1] + else: + return None diff --git a/dubbo/logger/logging/__init__.py b/dubbo/cluster/monitor/__init__.py similarity index 91% rename from dubbo/logger/logging/__init__.py rename to dubbo/cluster/monitor/__init__.py index 10e45eb..bcba37a 100644 --- a/dubbo/logger/logging/__init__.py +++ b/dubbo/cluster/monitor/__init__.py @@ -13,7 +13,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .logger_adapter import LoggerAdapter - -__all__ = ["LoggerAdapter"] diff --git a/dubbo/cluster/monitor/cpu.py b/dubbo/cluster/monitor/cpu.py new file mode 100644 index 0000000..3b3831f --- /dev/null +++ b/dubbo/cluster/monitor/cpu.py @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import threading +from typing import Dict, List +from dubbo.cluster.directories import RegistryDirectory +from dubbo.constants import common_constants +from dubbo.loggers import loggerFactory +from dubbo.protocol import Protocol, Invoker +from dubbo.protocol.invocation import RpcInvocation +from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler +from dubbo.registry import Registry +from dubbo.types import UnaryCallType +from dubbo.url import URL +from dubbo.utils import CpuUtils + +_LOGGER = loggerFactory.get_logger() + +_cpu_invocation = RpcInvocation( + "org.apache.dubbo.MetricsService", + "cpu", + str(1).encode("utf-8"), + attributes={ + common_constants.CALL_KEY: UnaryCallType, + }, +) + + +class CpuMonitor(RegistryDirectory): + """ + The CPU monitor. + """ + + def __init__(self, registry: Registry, protocol: Protocol, url: URL): + super().__init__(registry, protocol, url) + + # interval + self._interval = 5 + + # about CPU usage + self._usages_lock = threading.Lock() + self._cpu_usages: Dict[Invoker, float] = {} + + # running invokers + self._running_invokers: Dict[str, Invoker] = {} + + # thread + self._started = False + self._thread: threading.Thread = threading.Thread( + target=self._monitor_cpu, daemon=True + ) + self._stop_event: threading.Event = threading.Event() + + # start the monitor + self.start() + + def start(self) -> None: + """ + Start the monitor. + """ + if self._stop_event.is_set(): + raise RuntimeError("The monitor has been stopped.") + elif self._started: + return + + self._started = True + self._thread.start() + _LOGGER.info("The CPU monitor has been started.") + + def stop(self) -> None: + """ + Stop the monitor. + """ + if self._stop_event.is_set(): + return + # notify the thread to stop + self._stop_event.set() + + def _monitor_cpu(self) -> None: + """ + Monitor the CPU usage. + """ + while True: + # get available invokers + available_invokers = { + url: invoker + for url, invoker in self._invokers.items() + if invoker.is_available() + } + + # update the running invokers + self._running_invokers = available_invokers + + # update the CPU usage + with self._usages_lock: + self._cpu_usages = { + invoker: usage + for invoker, usage in self._cpu_usages.items() + if invoker in available_invokers.values() + } + + # get the CPU usage for each invoker + for url, invoker in self._running_invokers.items(): + if invoker.is_available(): + try: + result = invoker.invoke(_cpu_invocation) + cpu_usage = float(result.value().decode("utf-8")) + self._cpu_usages[invoker] = cpu_usage + except Exception as e: + _LOGGER.error( + f"Failed to get the CPU usage for invoker {url}: {str(e)}" + ) + # remove the cpu usage + self._remove_cpu_usage(invoker) + + # wait for the interval or stop + if self._stop_event.wait(self._interval): + _LOGGER.info("The CPU monitor has been stopped.") + break + + def get_cpu_usage(self, invokers: List[Invoker]) -> Dict[Invoker, float]: + """ + Get the CPU usage for the invoker. + :param invokers: The invokers. + :type invokers: List[Invoker] + :return: The CPU usage. + :rtype: Dict[Invoker, float] + """ + with self._usages_lock: + return {invoker: self._cpu_usages.get(invoker, 0) for invoker in invokers} + + def _remove_cpu_usage(self, invoker: Invoker) -> None: + with self._usages_lock: + self._cpu_usages.pop(invoker) + + +class CpuInnerRpcHandler: + """ + The CPU inner RPC handler. + """ + + @staticmethod + def get_service_handler() -> RpcServiceHandler: + """ + Get the service handler. + :return: The service handler. + :rtype: RpcServiceHandler + """ + return RpcServiceHandler( + "org.apache.dubbo.MetricsService", + {"cpu": RpcMethodHandler.unary(CpuInnerRpcHandler.get_cpu_usage)}, + ) + + @staticmethod + def get_cpu_usage(interval) -> bytes: + """ + Get the CPU usage. + :param interval: The interval. + :type interval: bytes + :return: The CPU usage. + :rtype: bytes + """ + float_value = CpuUtils.get_total_cpu_usage(interval=int(interval.decode("utf-8"))) + return str(float_value).encode("utf-8") diff --git a/dubbo/common/__init__.py b/dubbo/common/__init__.py deleted file mode 100644 index a860593..0000000 --- a/dubbo/common/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .classes import SingletonBase -from .deliverers import MultiMessageDeliverer, SingleMessageDeliverer -from .node import Node -from .types import DeserializingFunction, SerializingFunction -from .url import URL, create_url - -__all__ = [ - "SingleMessageDeliverer", - "MultiMessageDeliverer", - "URL", - "create_url", - "Node", - "SingletonBase", - "DeserializingFunction", - "SerializingFunction", -] diff --git a/dubbo/common/classes.py b/dubbo/common/classes.py deleted file mode 100644 index b27c7b9..0000000 --- a/dubbo/common/classes.py +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading - -__all__ = ["SingletonBase"] - - -class SingletonBase: - """ - Singleton base class. This class ensures that only one instance of a derived class exists. - - This implementation is thread-safe. - """ - - _instance = None - _instance_lock = threading.Lock() - - def __new__(cls, *args, **kwargs): - """ - Create a new instance of the class if it does not exist. - """ - if cls._instance is None: - with cls._instance_lock: - # double check - if cls._instance is None: - cls._instance = super(SingletonBase, cls).__new__(cls) - return cls._instance diff --git a/dubbo/common/constants.py b/dubbo/common/constants.py deleted file mode 100644 index 33e4f9f..0000000 --- a/dubbo/common/constants.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -PROTOCOL_KEY = "protocol" -TRIPLE = "triple" -TRIPLE_SHORT = "tri" - -SIDE_KEY = "side" -SERVER_VALUE = "server" -CLIENT_VALUE = "client" - -METHOD_KEY = "method" -SERVICE_KEY = "service" - -SERVICE_HANDLER_KEY = "service-handler" - -GROUP_KEY = "group" - -LOCAL_HOST_KEY = "localhost" -LOCAL_HOST_VALUE = "127.0.0.1" -DEFAULT_PORT = 50051 - -SSL_ENABLED_KEY = "ssl-enabled" - -SERIALIZATION_KEY = "serialization" -SERIALIZER_KEY = "serializer" -DESERIALIZER_KEY = "deserializer" - - -COMPRESSION_KEY = "compression" -COMPRESSOR_KEY = "compressor" -DECOMPRESSOR_KEY = "decompressor" - - -TRANSPORTER_KEY = "transporter" -TRANSPORTER_DEFAULT_VALUE = "aio" - -TRUE_VALUE = "true" -FALSE_VALUE = "false" - -CALL_KEY = "call" -UNARY_CALL_VALUE = "unary" -CLIENT_STREAM_CALL_VALUE = "client-stream" -SERVER_STREAM_CALL_VALUE = "server-stream" -BI_STREAM_CALL_VALUE = "bi-stream" - -PATH_SEPARATOR = "/" -PROTOCOL_SEPARATOR = "://" -DYNAMIC_KEY = "dynamic" diff --git a/dubbo/common/deliverers.py b/dubbo/common/deliverers.py deleted file mode 100644 index 67790ec..0000000 --- a/dubbo/common/deliverers.py +++ /dev/null @@ -1,314 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import enum -import queue -import threading -from typing import Any, Optional - -__all__ = ["MessageDeliverer", "SingleMessageDeliverer", "MultiMessageDeliverer"] - - -class DelivererStatus(enum.Enum): - """ - Enumeration for deliverer status. - - Possible statuses: - - PENDING: The deliverer is pending action. - - COMPLETED: The deliverer has completed the action. - - CANCELLED: The action for the deliverer has been cancelled. - - FINISHED: The deliverer has finished all actions and is in a final state. - """ - - PENDING = 0 - COMPLETED = 1 - CANCELLED = 2 - FINISHED = 3 - - @classmethod - def change_allowed( - cls, current_status: "DelivererStatus", target_status: "DelivererStatus" - ) -> bool: - """ - Check if a transition from `current_status` to `target_status` is allowed. - - :param current_status: The current status of the deliverer. - :type current_status: DelivererStatus - :param target_status: The target status to transition to. - :type target_status: DelivererStatus - :return: A boolean indicating if the transition is allowed. - :rtype: bool - """ - # PENDING -> COMPLETED or CANCELLED - if current_status == cls.PENDING: - return target_status in {cls.COMPLETED, cls.CANCELLED} - - # COMPLETED -> FINISHED or CANCELLED - elif current_status == cls.COMPLETED: - return target_status in {cls.FINISHED, cls.CANCELLED} - - # CANCELLED -> FINISHED - elif current_status == cls.CANCELLED: - return target_status == cls.FINISHED - - # FINISHED is the final state, no further transitions allowed - else: - return False - - -class NoMoreMessageError(RuntimeError): - """ - Exception raised when no more messages are available. - """ - - def __init__(self, message: str = "No more message"): - super().__init__(message) - - -class EmptyMessageError(RuntimeError): - """ - Exception raised when the message is empty. - """ - - def __init__(self, message: str = "Message is empty"): - super().__init__(message) - - -class MessageDeliverer(abc.ABC): - """ - Abstract base class for message deliverers. - """ - - __slots__ = ["_status"] - - def __init__(self): - self._status = DelivererStatus.PENDING - - @abc.abstractmethod - def add(self, message: Any) -> None: - """ - Add a message to the deliverer. - - :param message: The message to be added. - :type message: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def complete(self, message: Any = None) -> None: - """ - Mark the message delivery as complete. - - :param message: The last message (optional). - :type message: Any, optional - """ - raise NotImplementedError() - - @abc.abstractmethod - def cancel(self, exc: Optional[Exception]) -> None: - """ - Cancel the message delivery. - - :param exc: The exception that caused the cancellation. - :type exc: Exception, optional - """ - raise NotImplementedError() - - @abc.abstractmethod - def get(self) -> Any: - """ - Get the next message. - - :return: The next message. - :rtype: Any - :raises NoMoreMessageError: If no more messages are available. - :raises Exception: If the message delivery is cancelled. - """ - raise NotImplementedError() - - @abc.abstractmethod - def get_nowait(self) -> Any: - """ - Get the next message without waiting. - - :return: The next message. - :rtype: Any - :raises EmptyMessageError: If the message is empty. - :raises NoMoreMessageError: If no more messages are available. - :raises Exception: If the message delivery is cancelled. - """ - raise NotImplementedError() - - -class SingleMessageDeliverer(MessageDeliverer): - """ - Message deliverer for a single message using a signal-based approach. - """ - - __slots__ = ["_condition", "_message"] - - def __init__(self): - super().__init__() - self._condition = threading.Condition() - self._message: Any = None - - def add(self, message: Any) -> None: - with self._condition: - if self._status is DelivererStatus.PENDING: - # Add the message - self._message = message - - def complete(self, message: Any = None) -> None: - with self._condition: - if DelivererStatus.change_allowed(self._status, DelivererStatus.COMPLETED): - if message is not None: - self._message = message - # update the status - self._status = DelivererStatus.COMPLETED - self._condition.notify_all() - - def cancel(self, exc: Optional[Exception]) -> None: - with self._condition: - if DelivererStatus.change_allowed(self._status, DelivererStatus.CANCELLED): - # Cancel the delivery - self._message = exc or RuntimeError("delivery cancelled.") - self._status = DelivererStatus.CANCELLED - self._condition.notify_all() - - def get(self) -> Any: - with self._condition: - if self._status is DelivererStatus.FINISHED: - raise NoMoreMessageError("Message already consumed.") - - if self._status is DelivererStatus.PENDING: - # If the message is not available, wait - self._condition.wait() - - # check the status - if self._status is DelivererStatus.CANCELLED: - raise self._message - - self._status = DelivererStatus.FINISHED - return self._message - - def get_nowait(self) -> Any: - with self._condition: - if self._status is DelivererStatus.FINISHED: - self._status = DelivererStatus.PENDING - return self._message - - # raise error - if self._status is DelivererStatus.FINISHED: - raise NoMoreMessageError("Message already consumed.") - elif self._status is DelivererStatus.CANCELLED: - raise self._message - elif self._status is DelivererStatus.PENDING: - raise EmptyMessageError("Message is empty") - - -class MultiMessageDeliverer(MessageDeliverer): - """ - Message deliverer supporting multiple messages. - """ - - __slots__ = ["_lock", "_counter", "_messages", "_END_SENTINEL"] - - def __init__(self): - super().__init__() - self._lock = threading.Lock() - self._counter = 0 - self._messages: queue.PriorityQueue[Any] = queue.PriorityQueue() - self._END_SENTINEL = object() - - def add(self, message: Any) -> None: - with self._lock: - if self._status is DelivererStatus.PENDING: - # Add the message - self._counter += 1 - self._messages.put_nowait((self._counter, message)) - - def complete(self, message: Any = None) -> None: - with self._lock: - if DelivererStatus.change_allowed(self._status, DelivererStatus.COMPLETED): - if message is not None: - self._counter += 1 - self._messages.put_nowait((self._counter, message)) - - # Add the end sentinel - self._counter += 1 - self._messages.put_nowait((self._counter, self._END_SENTINEL)) - self._status = DelivererStatus.COMPLETED - - def cancel(self, exc: Optional[Exception]) -> None: - with self._lock: - if DelivererStatus.change_allowed(self._status, DelivererStatus.CANCELLED): - # Set the priority to -1 -> make sure it is the first message - self._messages.put_nowait( - (-1, exc or RuntimeError("delivery cancelled.")) - ) - self._status = DelivererStatus.CANCELLED - - def get(self) -> Any: - if self._status is DelivererStatus.FINISHED: - raise NoMoreMessageError("No more message") - - # block until the message is available - priority, message = self._messages.get() - - # check the status - if self._status is DelivererStatus.CANCELLED: - raise message - elif message is self._END_SENTINEL: - self._status = DelivererStatus.FINISHED - raise NoMoreMessageError("No more message") - else: - return message - - def get_nowait(self) -> Any: - try: - if self._status is DelivererStatus.FINISHED: - raise NoMoreMessageError("No more message") - - priority, message = self._messages.get_nowait() - - # check the status - if self._status is DelivererStatus.CANCELLED: - raise message - elif message is self._END_SENTINEL: - self._status = DelivererStatus.FINISHED - raise NoMoreMessageError("No more message") - else: - return message - except queue.Empty: - raise EmptyMessageError("Message is empty") - - def __iter__(self): - return self - - def __next__(self): - """ - Returns the next request from the queue. - - :return: The next message. - :rtype: Any - :raises StopIteration: If no more messages are available. - """ - while True: - try: - return self.get() - except NoMoreMessageError: - raise StopIteration diff --git a/dubbo/common/node.py b/dubbo/common/node.py deleted file mode 100644 index a5ec339..0000000 --- a/dubbo/common/node.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc - -from dubbo.common.url import URL - -__all__ = ["Node"] - - -class Node(abc.ABC): - """ - Abstract base class for a Node. - """ - - @abc.abstractmethod - def get_url(self) -> URL: - """ - Get the URL of the node. - - :return: The URL of the node. - :rtype: URL - :raises NotImplementedError: If the method is not implemented. - """ - raise NotImplementedError("get_url() is not implemented.") - - @abc.abstractmethod - def is_available(self) -> bool: - """ - Check if the node is available. - - :return: True if the node is available, False otherwise. - :rtype: bool - :raises NotImplementedError: If the method is not implemented. - """ - raise NotImplementedError("is_available() is not implemented.") - - @abc.abstractmethod - def destroy(self) -> None: - """ - Destroy the node. - - :raises NotImplementedError: If the method is not implemented. - """ - raise NotImplementedError("destroy() is not implemented.") diff --git a/dubbo/common/types.py b/dubbo/common/types.py deleted file mode 100644 index 029b837..0000000 --- a/dubbo/common/types.py +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Any, Callable - -__all__ = ["SerializingFunction", "DeserializingFunction"] - -SerializingFunction = Callable[[Any], bytes] -DeserializingFunction = Callable[[bytes], Any] diff --git a/dubbo/common/url.py b/dubbo/common/url.py deleted file mode 100644 index 581fd84..0000000 --- a/dubbo/common/url.py +++ /dev/null @@ -1,325 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -from typing import Any, Dict, Optional -from urllib import parse -from urllib.parse import urlencode, urlunparse - -from dubbo.common.constants import PROTOCOL_SEPARATOR - -__all__ = ["URL", "create_url"] - - -def create_url(url: str, encoded: bool = False) -> "URL": - """ - Creates a URL object from a URL string. - - This function takes a URL string and converts it into a URL object. - If the 'encoded' parameter is set to True, the URL string will be decoded before being converted. - - :param url: The URL string to be converted into a URL object. - :type url: str - :param encoded: Determines if the URL string should be decoded before being converted. Defaults to False. - :type encoded: bool - :return: A URL object. - :rtype: URL - :raises ValueError: If the URL format is invalid. - """ - # If the URL is encoded, decode it - if encoded: - url = parse.unquote(url) - - if PROTOCOL_SEPARATOR not in url: - raise ValueError("Invalid URL format: missing protocol") - - parsed_url = parse.urlparse(url) - - if not parsed_url.scheme: - raise ValueError("Invalid URL format: missing scheme.") - - return URL( - parsed_url.scheme, - parsed_url.hostname or "", - parsed_url.port, - parsed_url.username or "", - parsed_url.password or "", - parsed_url.path.lstrip("/"), - {k: v[0] for k, v in parse.parse_qs(parsed_url.query).items()}, - ) - - -class URL: - """ - URL - Uniform Resource Locator. - """ - - __slots__ = [ - "_scheme", - "_host", - "_port", - "_location", - "_username", - "_password", - "_path", - "_parameters", - "_attributes", - ] - - def __init__( - self, - scheme: str, - host: str, - port: Optional[int] = None, - username: str = "", - password: str = "", - path: str = "", - parameters: Optional[Dict[str, str]] = None, - attributes: Optional[Dict[str, Any]] = None, - ): - """ - Initialize the URL object. - - :param scheme: The scheme of the URL (e.g., 'http', 'https'). - :type scheme: str - :param host: The host of the URL. - :type host: str - :param port: The port number of the URL, defaults to None. - :type port: int, optional - :param username: The username for authentication, defaults to an empty string. - :type username: str, optional - :param password: The password for authentication, defaults to an empty string. - :type password: str, optional - :param path: The path of the URL, defaults to an empty string. - :type path: str, optional - :param parameters: The query parameters of the URL as a dictionary, defaults to None. - :type parameters: Dict[str, str], optional - :param attributes: Additional attributes of the URL as a dictionary, defaults to None. - :type attributes: Dict[str, Any], optional - """ - self._scheme = scheme - self._host = host - self._port = port - self._location = f"{host}:{port}" if port else host - self._username = username - self._password = password - self._path = path - self._parameters = parameters or {} - self._attributes = attributes or {} - - @property - def scheme(self) -> str: - """ - Get or set the scheme of the URL. - - :return: The scheme of the URL. - :rtype: str - """ - return self._scheme - - @scheme.setter - def scheme(self, value: str): - self._scheme = value - - @property - def host(self) -> str: - """ - Get or set the host of the URL. - - :return: The host of the URL. - :rtype: str - """ - return self._host - - @host.setter - def host(self, value: str): - self._host = value - self._location = f"{self.host}:{self.port}" if self.port else self.host - - @property - def port(self) -> Optional[int]: - """ - Get or set the port of the URL. - - :return: The port of the URL. - :rtype: int, optional - """ - return self._port - - @port.setter - def port(self, value: int): - if value > 0: - self._port = value - self._location = f"{self.host}:{self.port}" - - @property - def location(self) -> str: - """ - Get or set the location (host:port) of the URL. - - :return: The location of the URL. - :rtype: str - """ - return self._location - - @location.setter - def location(self, value: str): - try: - values = value.split(":") - self.host = values[0] - if len(values) == 2: - self.port = int(values[1]) - except Exception as e: - raise ValueError(f"Invalid location: {value}") from e - - @property - def username(self) -> str: - """ - Get or set the username for authentication. - - :return: The username. - :rtype: str - """ - return self._username - - @username.setter - def username(self, value: str): - self._username = value - - @property - def password(self) -> str: - """ - Get or set the password for authentication. - - :return: The password. - :rtype: str - """ - return self._password - - @password.setter - def password(self, value: str): - self._password = value - - @property - def path(self) -> str: - """ - Get or set the path of the URL. - - :return: The path of the URL. - :rtype: str - """ - return self._path - - @path.setter - def path(self, value: str): - self._path = value.lstrip("/") - - @property - def parameters(self) -> Dict[str, str]: - """ - Get the query parameters of the URL. - - :return: The query parameters as a dictionary. - :rtype: Dict[str, str] - """ - return self._parameters - - @property - def attributes(self) -> Dict[str, Any]: - """ - Get the additional attributes of the URL. - - :return: The attributes as a dictionary. - :rtype: Dict[str, Any] - """ - return self._attributes - - def to_str(self, encode: bool = False) -> str: - """ - Converts the URL to a string. - - :param encode: Determines if the URL should be encoded. Defaults to False. - :type encode: bool - :return: The URL string. - :rtype: str - """ - # Construct the netloc part - if self.username and self.password: - netloc = f"{self.username}:{self.password}@{self.host}" - else: - netloc = self.host - - if self.port: - netloc = f"{netloc}:{self.port}" - - # Convert parameters dictionary to query string - query = urlencode(self.parameters) - - # Construct the URL - url = urlunparse((self.scheme or "", netloc, self.path or "/", "", query, "")) - - if encode: - url = parse.quote(url) - - return url - - def copy(self) -> "URL": - """ - Copy the URL object. - - :return: A shallow copy of the URL object. - :rtype: URL - """ - return copy.copy(self) - - def deepcopy(self) -> "URL": - """ - Deep copy the URL object. - - :return: A deep copy of the URL object. - :rtype: URL - """ - return copy.deepcopy(self) - - def __copy__(self) -> "URL": - return URL( - self.scheme, - self.host, - self.port, - self.username, - self.password, - self.path, - self.parameters.copy(), - self.attributes.copy(), - ) - - def __deepcopy__(self, memo) -> "URL": - return URL( - self.scheme, - self.host, - self.port, - self.username, - self.password, - self.path, - copy.deepcopy(self.parameters, memo), - copy.deepcopy(self.attributes, memo), - ) - - def __str__(self) -> str: - return self.to_str() - - def __repr__(self) -> str: - return self.to_str() diff --git a/dubbo/common/utils.py b/dubbo/common/utils.py deleted file mode 100644 index 4b20998..0000000 --- a/dubbo/common/utils.py +++ /dev/null @@ -1,129 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -__all__ = ["EventHelper", "FutureHelper"] - - -class EventHelper: - """ - Helper class for event operations. - """ - - @staticmethod - def is_set(event) -> bool: - """ - Check if the event is set. - - :param event: Event object, you can use threading.Event or any other object that supports the is_set operation. - :type event: Any - :return: True if the event is set, or False if the is_set method is not supported or the event is invalid. - :rtype: bool - """ - return event.is_set() if event and hasattr(event, "is_set") else False - - @staticmethod - def set(event) -> bool: - """ - Attempt to set the event object. - - :param event: Event object, you can use threading.Event or any other object that supports the set operation. - :type event: Any - :return: True if the event was set, False otherwise - (such as the event is invalid or does not support the set operation). - :rtype: bool - """ - if event is None: - return False - - # If the event supports the set operation, set the event and return True - if hasattr(event, "set"): - event.set() - return True - - # If the event is invalid or does not support the set operation, return False - return False - - @staticmethod - def clear(event) -> bool: - """ - Attempt to clear the event object. - - :param event: Event object, you can use threading.Event or any other object that supports the clear operation. - :type event: Any - :return: True if the event was cleared, False otherwise - (such as the event is invalid or does not support the clear operation). - :rtype: bool - """ - if not event: - return False - - # If the event supports the clear operation, clear the event and return True - if hasattr(event, "clear"): - event.clear() - return True - - # If the event is invalid or does not support the clear operation, return False - return False - - -class FutureHelper: - """ - Helper class for future operations. - """ - - @staticmethod - def done(future) -> bool: - """ - Check if the future is done. - - :param future: Future object - :type future: Any - :return: True if the future is done, False otherwise. - :rtype: bool - """ - return future.done() if future and hasattr(future, "done") else False - - @staticmethod - def set_result(future, result): - """ - Set the result of the future. - - :param future: Future object - :type future: Any - :param result: Result to set - :type result: Any - """ - if not future or FutureHelper.done(future): - return - - if hasattr(future, "set_result"): - future.set_result(result) - - @staticmethod - def set_exception(future, exception): - """ - Set the exception to the future. - - :param future: Future object - :type future: Any - :param exception: Exception to set - :type exception: Exception - """ - if not future or FutureHelper.done(future): - return - - if hasattr(future, "set_exception"): - future.set_exception(exception) diff --git a/dubbo/config/__init__.py b/dubbo/config/__init__.py deleted file mode 100644 index 7ffd615..0000000 --- a/dubbo/config/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .logger_config import FileLoggerConfig, LoggerConfig -from .protocol_config import ProtocolConfig -from .reference_config import ReferenceConfig diff --git a/dubbo/config/logger_config.py b/dubbo/config/logger_config.py deleted file mode 100644 index ecae584..0000000 --- a/dubbo/config/logger_config.py +++ /dev/null @@ -1,150 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dataclasses import dataclass -from typing import Dict, Optional - -from dubbo.common.url import URL -from dubbo.extension import extensionLoader -from dubbo.logger import LoggerAdapter -from dubbo.logger import constants as logger_constants -from dubbo.logger import loggerFactory -from dubbo.logger.constants import Level - - -@dataclass -class FileLoggerConfig: - """ - File logger configuration. - :param rotate: File rotate type. - :type rotate: logger_constants.FileRotateType - :param file_formatter: File formatter. - :type file_formatter: Optional[str] - :param file_dir: File directory. - :type file_dir: str - :param file_name: File name. - :type file_name: str - :param backup_count: Backup count. - :type backup_count: int - :param max_bytes: Max bytes. - :type max_bytes: int - :param interval: Interval. - :type interval: int - """ - - rotate: logger_constants.FileRotateType = logger_constants.FileRotateType.NONE - file_formatter: Optional[str] = None - file_dir: str = logger_constants.DEFAULT_FILE_DIR_VALUE - file_name: str = logger_constants.DEFAULT_FILE_NAME_VALUE - backup_count: int = logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE - max_bytes: int = logger_constants.DEFAULT_FILE_MAX_BYTES_VALUE - interval: int = logger_constants.DEFAULT_FILE_INTERVAL_VALUE - - def check(self) -> None: - if self.rotate == logger_constants.FileRotateType.SIZE and self.max_bytes < 0: - raise ValueError("Max bytes can't be less than 0") - elif self.rotate == logger_constants.FileRotateType.TIME and self.interval < 1: - raise ValueError("Interval can't be less than 1") - - def dict(self) -> Dict[str, str]: - return { - logger_constants.FILE_DIR_KEY: self.file_dir, - logger_constants.FILE_NAME_KEY: self.file_name, - logger_constants.FILE_ROTATE_KEY: self.rotate.value, - logger_constants.FILE_MAX_BYTES_KEY: str(self.max_bytes), - logger_constants.FILE_INTERVAL_KEY: str(self.interval), - logger_constants.FILE_BACKUP_COUNT_KEY: str(self.backup_count), - } - - -class LoggerConfig: - """ - Logger configuration. - """ - - __slots__ = [ - "_driver", - "_level", - "_console_enabled", - "_console_config", - "_file_enabled", - "_file_config", - ] - - def __init__( - self, - driver, - level: Level, - console_enabled: bool, - file_enabled: bool, - file_config: FileLoggerConfig, - ): - """ - Initialize the logger configuration. - :param driver: The logger driver. - :type driver: str - :param level: The logger level. - :type level: Level - :param console_enabled: Whether to enable console logger. - :type console_enabled: bool - :param file_enabled: Whether to enable file logger. - :type file_enabled: bool - :param file_config: The file logger configuration. - :type file_config: FileLogger - """ - # set global config - self._driver = driver - self._level = level - # set console config - self._console_enabled = console_enabled - # set file comfig - self._file_enabled = file_enabled - self._file_config = file_config - if file_enabled: - self._file_config.check() - - def get_url(self) -> URL: - # get LoggerConfig parameters - parameters = { - logger_constants.DRIVER_KEY: self._driver, - logger_constants.LEVEL_KEY: self._level.value, - logger_constants.CONSOLE_ENABLED_KEY: str(self._console_enabled), - logger_constants.FILE_ENABLED_KEY: str(self._file_enabled), - **self._file_config.dict(), - } - - return URL(scheme=self._driver, host=self._level.value, parameters=parameters) - - def init(self): - # get logger_adapter and initialize loggerFactory - logger_adapter_class = extensionLoader.get_extension( - LoggerAdapter, self._driver - ) - logger_adapter = logger_adapter_class(self.get_url()) - loggerFactory.set_logger_adapter(logger_adapter) - - @classmethod - def default_config(cls): - """ - Get default logger configuration. - """ - return LoggerConfig( - driver=logger_constants.DEFAULT_DRIVER_VALUE, - level=logger_constants.DEFAULT_LEVEL_VALUE, - console_enabled=logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE, - file_enabled=logger_constants.DEFAULT_FILE_ENABLED_VALUE, - file_config=FileLoggerConfig(), - ) diff --git a/dubbo/config/protocol_config.py b/dubbo/config/protocol_config.py deleted file mode 100644 index d629e1f..0000000 --- a/dubbo/config/protocol_config.py +++ /dev/null @@ -1,30 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class ProtocolConfig: - - _name: str - - __slots__ = ["_name"] - - @property - def name(self) -> str: - return self._name - - @name.setter - def name(self, value: str): - self._name = value diff --git a/dubbo/config/reference_config.py b/dubbo/config/reference_config.py deleted file mode 100644 index a7f258c..0000000 --- a/dubbo/config/reference_config.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -from typing import Optional, Union - -from dubbo.common import URL, create_url -from dubbo.extension import extensionLoader -from dubbo.protocol import Invoker, Protocol - - -class ReferenceConfig: - - __slots__ = [ - "_initialized", - "_global_lock", - "_service_name", - "_url", - "_protocol", - "_invoker", - ] - - def __init__(self, url: Union[str, URL], service_name: str): - self._initialized = False - self._global_lock = threading.Lock() - self._url: URL = url if isinstance(url, URL) else create_url(url) - self._service_name = service_name - self._protocol: Optional[Protocol] = None - self._invoker: Optional[Invoker] = None - - def get_invoker(self) -> Invoker: - if not self._invoker: - self._do_init() - return self._invoker - - def _do_init(self): - with self._global_lock: - if self._initialized: - return - # Get the interface name from the URL path - self._url.path = self._service_name - self._protocol = extensionLoader.get_extension(Protocol, self._url.scheme)( - self._url - ) - self._create_invoker() - self._initialized = True - - def _create_invoker(self): - self._invoker = self._protocol.refer(self._url) diff --git a/dubbo/config/service_config.py b/dubbo/config/service_config.py deleted file mode 100644 index a4f3644..0000000 --- a/dubbo/config/service_config.py +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -from dubbo.common import URL -from dubbo.common import constants as common_constants -from dubbo.extension import extensionLoader -from dubbo.protocol import Protocol -from dubbo.proxy.handlers import RpcServiceHandler - -__all__ = ["ServiceConfig"] - - -class ServiceConfig: - """ - Service configuration - """ - - def __init__( - self, - service_handler: RpcServiceHandler, - port: Optional[int] = None, - protocol: Optional[str] = None, - ): - - self._service_handler = service_handler - self._port = port or common_constants.DEFAULT_PORT - - protocol_str = protocol or common_constants.TRIPLE_SHORT - - self._export_url = URL( - protocol_str, common_constants.LOCAL_HOST_KEY, self._port - ) - self._export_url.attributes[common_constants.SERVICE_HANDLER_KEY] = ( - service_handler - ) - - self._protocol: Protocol = extensionLoader.get_extension( - Protocol, protocol_str - )(self._export_url) - - self._exported = False - self._exporting = False - - def export(self): - """ - Export service - """ - if self._exporting or self._exported: - return - - self._exporting = True - try: - self._protocol.export(self._export_url) - self._exported = True - finally: - self._exporting = False diff --git a/dubbo/extension/registries.py b/dubbo/extension/registries.py index 37c7bc7..f98974f 100644 --- a/dubbo/extension/registries.py +++ b/dubbo/extension/registries.py @@ -62,6 +62,7 @@ class ExtendedRegistry: interface=LoadBalance, impls={ "random": "dubbo.cluster.loadbalances.RandomLoadBalance", + "cpu": "dubbo.cluster.loadbalances.CpuLoadBalance", }, ) diff --git a/dubbo/logger/__init__.py b/dubbo/logger/__init__.py deleted file mode 100644 index 4f42594..0000000 --- a/dubbo/logger/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._interfaces import Logger, LoggerAdapter -from .logger_factory import LoggerFactory as _LoggerFactory - -# The logger factory instance. -loggerFactory = _LoggerFactory() - -__all__ = ["Logger", "LoggerAdapter", "loggerFactory"] diff --git a/dubbo/logger/_interfaces.py b/dubbo/logger/_interfaces.py deleted file mode 100644 index 88fa999..0000000 --- a/dubbo/logger/_interfaces.py +++ /dev/null @@ -1,204 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -from typing import Any - -from dubbo.common.url import URL - -from .constants import Level - -_all__ = ["Logger", "LoggerAdapter"] - - -class Logger(abc.ABC): - """ - Logger Interface, which is used to log messages. - """ - - @abc.abstractmethod - def log(self, level: Level, msg: str, *args: Any, **kwargs: Any) -> None: - """ - Log a message at the specified logging level. - - :param level: The logging level. - :type level: Level - :param msg: The log message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def debug(self, msg: str, *args, **kwargs) -> None: - """ - Log a debug message. - - :param msg: The debug message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def info(self, msg: str, *args, **kwargs) -> None: - """ - Log an info message. - - :param msg: The info message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def warning(self, msg: str, *args, **kwargs) -> None: - """ - Log a warning message. - - :param msg: The warning message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def error(self, msg: str, *args, **kwargs) -> None: - """ - Log an error message. - - :param msg: The error message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def critical(self, msg: str, *args, **kwargs) -> None: - """ - Log a critical message. - - :param msg: The critical message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def fatal(self, msg: str, *args, **kwargs) -> None: - """ - Log a fatal message. - - :param msg: The fatal message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def exception(self, msg: str, *args, **kwargs) -> None: - """ - Log an exception message. - - :param msg: The exception message. - :type msg: str - :param args: Additional positional arguments. - :type args: Any - :param kwargs: Additional keyword arguments. - :type kwargs: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def is_enabled_for(self, level: Level) -> bool: - """ - Check if this logger is enabled for the specified level. - - :param level: The logging level. - :type level: Level - :return: Whether the logging level is enabled. - :rtype: bool - """ - raise NotImplementedError() - - -class LoggerAdapter(abc.ABC): - """ - Logger Adapter Interface, which is used to support different logging libraries. - """ - - __slots__ = ["_config"] - - def __init__(self, config: URL): - """ - Initialize the logger adapter. - - :param config: The configuration of the logger adapter. - :type config: URL - """ - self._config = config - - def get_logger(self, name: str) -> Logger: - """ - Get a logger by name. - - :param name: The name of the logger. - :type name: str - :return: An instance of the logger. - :rtype: Logger - """ - raise NotImplementedError() - - @property - def level(self) -> Level: - """ - Get the current logging level. - - :return: The current logging level. - :rtype: Level - """ - raise NotImplementedError() - - @level.setter - def level(self, level: Level) -> None: - """ - Set the logging level. - - :param level: The logging level to set. - :type level: Level - """ - raise NotImplementedError() diff --git a/dubbo/logger/constants.py b/dubbo/logger/constants.py deleted file mode 100644 index a6cae5d..0000000 --- a/dubbo/logger/constants.py +++ /dev/null @@ -1,127 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import enum -import os - -__all__ = [ - "Level", - "FileRotateType", - "LEVEL_KEY", - "DRIVER_KEY", - "CONSOLE_ENABLED_KEY", - "FILE_ENABLED_KEY", - "FILE_DIR_KEY", - "FILE_NAME_KEY", - "FILE_ROTATE_KEY", - "FILE_MAX_BYTES_KEY", - "FILE_INTERVAL_KEY", - "FILE_BACKUP_COUNT_KEY", - "DEFAULT_DRIVER_VALUE", - "DEFAULT_LEVEL_VALUE", - "DEFAULT_CONSOLE_ENABLED_VALUE", - "DEFAULT_FILE_ENABLED_VALUE", - "DEFAULT_FILE_DIR_VALUE", - "DEFAULT_FILE_NAME_VALUE", - "DEFAULT_FILE_MAX_BYTES_VALUE", - "DEFAULT_FILE_INTERVAL_VALUE", - "DEFAULT_FILE_BACKUP_COUNT_VALUE", -] - - -@enum.unique -class Level(enum.Enum): - """ - The logging level enum. - - :cvar DEBUG: Debug level. - :cvar INFO: Info level. - :cvar WARNING: Warning level. - :cvar ERROR: Error level. - :cvar CRITICAL: Critical level. - :cvar FATAL: Fatal level. - :cvar UNKNOWN: Unknown level. - """ - - DEBUG = "DEBUG" - INFO = "INFO" - WARNING = "WARNING" - ERROR = "ERROR" - CRITICAL = "CRITICAL" - FATAL = "FATAL" - UNKNOWN = "UNKNOWN" - - @classmethod - def get_level(cls, level_value: str) -> "Level": - """ - Get the level from the level value. - - :param level_value: The level value. - :type level_value: str - :return: The level. If the level value is invalid, return UNKNOWN. - :rtype: Level - """ - level_value = level_value.upper() - for level in cls: - if level_value == level.value: - return level - return cls.UNKNOWN - - -@enum.unique -class FileRotateType(enum.Enum): - """ - The file rotating type enum. - - :cvar NONE: No rotating. - :cvar SIZE: Rotate the file by size. - :cvar TIME: Rotate the file by time. - """ - - NONE = "NONE" - SIZE = "SIZE" - TIME = "TIME" - - -"""logger config keys""" -# global config -LEVEL_KEY = "logger.level" -DRIVER_KEY = "logger.driver" - -# console config -CONSOLE_ENABLED_KEY = "logger.console.enable" - -# file logger -FILE_ENABLED_KEY = "logger.file.enable" -FILE_DIR_KEY = "logger.file.dir" -FILE_NAME_KEY = "logger.file.name" -FILE_ROTATE_KEY = "logger.file.rotate" -FILE_MAX_BYTES_KEY = "logger.file.maxbytes" -FILE_INTERVAL_KEY = "logger.file.interval" -FILE_BACKUP_COUNT_KEY = "logger.file.backupcount" - -"""some logger default value""" -DEFAULT_DRIVER_VALUE = "logging" -DEFAULT_LEVEL_VALUE = Level.DEBUG -# console -DEFAULT_CONSOLE_ENABLED_VALUE = True -# file -DEFAULT_FILE_ENABLED_VALUE = False -DEFAULT_FILE_DIR_VALUE = os.path.expanduser("~") -DEFAULT_FILE_NAME_VALUE = "dubbo.log" -DEFAULT_FILE_MAX_BYTES_VALUE = 10 * 1024 * 1024 -DEFAULT_FILE_INTERVAL_VALUE = 1 -DEFAULT_FILE_BACKUP_COUNT_VALUE = 10 diff --git a/dubbo/logger/logger_factory.py b/dubbo/logger/logger_factory.py deleted file mode 100644 index 0a7d0b2..0000000 --- a/dubbo/logger/logger_factory.py +++ /dev/null @@ -1,127 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -from typing import Dict, Optional - -from dubbo.common import SingletonBase -from dubbo.common.url import URL -from dubbo.logger import Logger, LoggerAdapter -from dubbo.logger import constants as logger_constants -from dubbo.logger.constants import Level - -__all__ = ["LoggerFactory"] - -# Default logger config with default values. -_DEFAULT_CONFIG = URL( - scheme=logger_constants.DEFAULT_DRIVER_VALUE, - host=logger_constants.DEFAULT_LEVEL_VALUE.value, - parameters={ - logger_constants.DRIVER_KEY: logger_constants.DEFAULT_DRIVER_VALUE, - logger_constants.LEVEL_KEY: logger_constants.DEFAULT_LEVEL_VALUE.value, - logger_constants.CONSOLE_ENABLED_KEY: str( - logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE - ), - logger_constants.FILE_ENABLED_KEY: str( - logger_constants.DEFAULT_FILE_ENABLED_VALUE - ), - }, -) - - -class LoggerFactory(SingletonBase): - """ - Singleton factory class for creating and managing loggers. - - This class ensures a single instance of the logger factory, provides methods to set and get - logger adapters, and manages logger instances. - """ - - def __init__(self): - """ - Initialize the logger factory. - - This method sets up the internal lock, logger adapter, and logger cache. - """ - self._lock = threading.RLock() - self._logger_adapter: Optional[LoggerAdapter] = None - self._loggers: Dict[str, Logger] = {} - - def _ensure_logger_adapter(self) -> None: - """ - Ensure the logger adapter is set. - - If the logger adapter is not set, this method sets it to the default adapter. - """ - if not self._logger_adapter: - with self._lock: - if not self._logger_adapter: - # Import here to avoid circular imports - from dubbo.logger.logging.logger_adapter import LoggingLoggerAdapter - - self.set_logger_adapter(LoggingLoggerAdapter(_DEFAULT_CONFIG)) - - def set_logger_adapter(self, logger_adapter: LoggerAdapter) -> None: - """ - Set the logger adapter. - - :param logger_adapter: The new logger adapter to use. - :type logger_adapter: LoggerAdapter - """ - with self._lock: - self._logger_adapter = logger_adapter - # Update all loggers - self._loggers = { - name: self._logger_adapter.get_logger(name) for name in self._loggers - } - - def get_logger_adapter(self) -> LoggerAdapter: - """ - Get the current logger adapter. - - :return: The current logger adapter. - :rtype: LoggerAdapter - """ - self._ensure_logger_adapter() - return self._logger_adapter - - def get_logger(self, name: str) -> Logger: - """ - Get the logger by name. - - :param name: The name of the logger to retrieve. - :type name: str - :return: An instance of the requested logger. - :rtype: Logger - """ - self._ensure_logger_adapter() - logger = self._loggers.get(name) - if not logger: - with self._lock: - if name not in self._loggers: - self._loggers[name] = self._logger_adapter.get_logger(name) - logger = self._loggers[name] - return logger - - def get_level(self) -> Level: - """ - Get the current logging level. - - :return: The current logging level. - :rtype: Level - """ - self._ensure_logger_adapter() - return self._logger_adapter.level diff --git a/dubbo/logger/logging/formatter.py b/dubbo/logger/logging/formatter.py deleted file mode 100644 index 1dc409e..0000000 --- a/dubbo/logger/logging/formatter.py +++ /dev/null @@ -1,89 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re -from enum import Enum - -__all__ = ["ColorFormatter", "NoColorFormatter", "Colors"] - - -class Colors(Enum): - """ - Colors for log messages. - """ - - END = "\033[0m" - BOLD = "\033[1m" - BLUE = "\033[34m" - GREEN = "\033[32m" - PURPLE = "\033[35m" - CYAN = "\033[36m" - RED = "\033[31m" - YELLOW = "\033[33m" - GREY = "\033[38;5;240m" - - -LEVEL_MAP = { - "DEBUG": Colors.BLUE.value, - "INFO": Colors.GREEN.value, - "WARNING": Colors.YELLOW.value, - "ERROR": Colors.RED.value, - "CRITICAL": Colors.RED.value + Colors.BOLD.value, -} - -DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S" - -LOG_FORMAT: str = ( - f"{Colors.GREEN.value}%(asctime)s{Colors.END.value}" - " | " - f"%(level_color)s%(levelname)s{Colors.END.value}" - " | " - f"{Colors.CYAN.value}%(module)s:%(funcName)s:%(lineno)d{Colors.END.value}" - " - " - f"{Colors.PURPLE.value}[Dubbo]{Colors.END.value} " - f"%(msg_color)s%(message)s{Colors.END.value}" -) - - -class ColorFormatter(logging.Formatter): - """ - A formatter with color. - It will format the log message like this: - 2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 - [Dubbo] debug log - """ - - def __init__(self): - self.log_format = LOG_FORMAT - super().__init__(self.log_format, DATE_FORMAT) - - def format(self, record) -> str: - levelname = record.levelname - record.level_color = record.msg_color = LEVEL_MAP.get(levelname) - return super().format(record) - - -class NoColorFormatter(logging.Formatter): - """ - A formatter without color. - It will format the log message like this: - 2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 - [Dubbo] debug log - """ - - def __init__(self): - color_re = re.compile(r"\033\[[0-9;]*\w|%\((msg_color|level_color)\)s") - self.log_format = color_re.sub("", LOG_FORMAT) - super().__init__(self.log_format, DATE_FORMAT) diff --git a/dubbo/logger/logging/logger.py b/dubbo/logger/logging/logger.py deleted file mode 100644 index d8feb77..0000000 --- a/dubbo/logger/logging/logger.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Dict - -from dubbo.logger import Logger - -from ..constants import Level - -__all__ = ["LoggingLogger"] - -# The mapping from the logging level to the logging level. -LEVEL_MAP: Dict[Level, int] = { - Level.DEBUG: logging.DEBUG, - Level.INFO: logging.INFO, - Level.WARNING: logging.WARNING, - Level.ERROR: logging.ERROR, - Level.CRITICAL: logging.CRITICAL, - Level.FATAL: logging.FATAL, -} - -STACKLEVEL_KEY = "stacklevel" -STACKLEVEL_DEFAULT = 1 -STACKLEVEL_OFFSET = 2 - -EXC_INFO_KEY = "exc_info" -EXC_INFO_DEFAULT = True - - -class LoggingLogger(Logger): - """ - The logging logger implementation. - """ - - __slots__ = ["_logger"] - - def __init__(self, internal_logger: logging.Logger): - """ - Initialize the logger. - :param internal_logger: The internal logger. - :type internal_logger: logging - """ - self._logger = internal_logger - - def _log(self, level: int, msg: str, *args, **kwargs) -> None: - # Add the stacklevel to the keyword arguments. - kwargs[STACKLEVEL_KEY] = ( - kwargs.get(STACKLEVEL_KEY, STACKLEVEL_DEFAULT) + STACKLEVEL_OFFSET - ) - self._logger.log(level, msg, *args, **kwargs) - - def log(self, level: Level, msg: str, *args, **kwargs) -> None: - self._log(LEVEL_MAP[level], msg, *args, **kwargs) - - def debug(self, msg: str, *args, **kwargs) -> None: - self._log(logging.DEBUG, msg, *args, **kwargs) - - def info(self, msg: str, *args, **kwargs) -> None: - self._log(logging.INFO, msg, *args, **kwargs) - - def warning(self, msg: str, *args, **kwargs) -> None: - self._log(logging.WARNING, msg, *args, **kwargs) - - def error(self, msg: str, *args, **kwargs) -> None: - self._log(logging.ERROR, msg, *args, **kwargs) - - def critical(self, msg: str, *args, **kwargs) -> None: - self._log(logging.CRITICAL, msg, *args, **kwargs) - - def fatal(self, msg: str, *args, **kwargs) -> None: - self._log(logging.FATAL, msg, *args, **kwargs) - - def exception(self, msg: str, *args, **kwargs) -> None: - if kwargs.get(EXC_INFO_KEY) is None: - kwargs[EXC_INFO_KEY] = EXC_INFO_DEFAULT - self.error(msg, *args, **kwargs) - - def is_enabled_for(self, level: Level) -> bool: - logging_level = LEVEL_MAP.get(level) - return self._logger.isEnabledFor(logging_level) if logging_level else False diff --git a/dubbo/logger/logging/logger_adapter.py b/dubbo/logger/logging/logger_adapter.py deleted file mode 100644 index 3e60813..0000000 --- a/dubbo/logger/logging/logger_adapter.py +++ /dev/null @@ -1,186 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import sys -from functools import cache -from logging import handlers - -from dubbo.common import constants as common_constants -from dubbo.common.url import URL -from dubbo.logger import Logger, LoggerAdapter -from dubbo.logger import constants as logger_constants -from dubbo.logger.constants import LEVEL_KEY, Level -from dubbo.logger.logging import formatter -from dubbo.logger.logging.logger import LoggingLogger - -"""This module provides the logging logger implementation. -> logging module""" - -__all__ = ["LoggingLoggerAdapter"] - - -class LoggingLoggerAdapter(LoggerAdapter): - """ - Internal logger adapter responsible for creating loggers and encapsulating the logging.getLogger() method. - """ - - __slots__ = ["_level"] - - def __init__(self, config: URL): - """ - Initialize the LoggingLoggerAdapter with the given configuration. - - :param config: The configuration URL for the logger adapter. - :type config: URL - """ - super().__init__(config) - # Set level - level_name = config.parameters.get(LEVEL_KEY) - self._level = Level.get_level(level_name) if level_name else Level.DEBUG - self._update_level() - - def get_logger(self, name: str) -> Logger: - """ - Create a logger instance by name. - - :param name: The logger name. - :type name: str - :return: An instance of the logger. - :rtype: Logger - """ - logger_instance = logging.getLogger(name) - # clean up handlers - logger_instance.handlers.clear() - - # Add console handler - console_enabled = self._config.parameters.get( - logger_constants.CONSOLE_ENABLED_KEY, - str(logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE), - ) - if console_enabled.lower() == common_constants.TRUE_VALUE or bool( - sys.stdout and sys.stdout.isatty() - ): - logger_instance.addHandler(self._get_console_handler()) - - # Add file handler - file_enabled = self._config.parameters.get( - logger_constants.FILE_ENABLED_KEY, - str(logger_constants.DEFAULT_FILE_ENABLED_VALUE), - ) - if file_enabled.lower() == common_constants.TRUE_VALUE: - logger_instance.addHandler(self._get_file_handler()) - - if not logger_instance.handlers: - # It's intended to be used to avoid the "No handlers could be found for logger XXX" one-off warning. - logger_instance.addHandler(logging.NullHandler()) - - return LoggingLogger(logger_instance) - - @cache - def _get_console_handler(self) -> logging.StreamHandler: - """ - Get the console handler, avoiding duplicate creation with caching. - - :return: The console handler. - :rtype: logging.StreamHandler - """ - console_handler = logging.StreamHandler() - console_handler.setFormatter(formatter.ColorFormatter()) - - return console_handler - - @cache - def _get_file_handler(self) -> logging.Handler: - """ - Get the file handler, avoiding duplicate creation with caching. - - :return: The file handler. - :rtype: logging.Handler - """ - # Get file path - file_dir = self._config.parameters.get(logger_constants.FILE_DIR_KEY) - file_name = self._config.parameters.get( - logger_constants.FILE_NAME_KEY, logger_constants.DEFAULT_FILE_NAME_VALUE - ) - file_path = os.path.join(file_dir, file_name) - # Get backup count - backup_count = int( - self._config.parameters.get( - logger_constants.FILE_BACKUP_COUNT_KEY, - logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE, - ) - ) - # Get rotate type - rotate_type = self._config.parameters.get(logger_constants.FILE_ROTATE_KEY) - - # Set file Handler - file_handler: logging.Handler - if rotate_type == logger_constants.FileRotateType.SIZE.value: - # Set RotatingFileHandler - max_bytes = int( - self._config.parameters.get(logger_constants.FILE_MAX_BYTES_KEY) - ) - file_handler = handlers.RotatingFileHandler( - file_path, maxBytes=max_bytes, backupCount=backup_count - ) - elif rotate_type == logger_constants.FileRotateType.TIME.value: - # Set TimedRotatingFileHandler - interval = int( - self._config.parameters.get(logger_constants.FILE_INTERVAL_KEY) - ) - file_handler = handlers.TimedRotatingFileHandler( - file_path, interval=interval, backupCount=backup_count - ) - else: - # Set FileHandler - file_handler = logging.FileHandler(file_path) - - # Add file_handler - file_handler.setFormatter(formatter.NoColorFormatter()) - return file_handler - - @property - def level(self) -> Level: - """ - Get the logging level. - - :return: The current logging level. - :rtype: Level - """ - return self._level - - @level.setter - def level(self, level: Level) -> None: - """ - Set the logging level. - - :param level: The logging level to set. - :type level: Level - """ - if level == self._level or level is None: - return - self._level = level - self._update_level() - - def _update_level(self): - """ - Update the log level by modifying the root logger. - """ - # Get the root logger - root_logger = logging.getLogger() - # Set the logging level - root_logger.setLevel(self._level.value) diff --git a/dubbo/protocol/triple/invoker.py b/dubbo/protocol/triple/invoker.py index 95c6147..de93e94 100644 --- a/dubbo/protocol/triple/invoker.py +++ b/dubbo/protocol/triple/invoker.py @@ -47,7 +47,7 @@ class TripleInvoker(Invoker): Triple invoker. """ - __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression", "_destroyed"] + __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression"] def __init__( self, url: URL, client: Client, stream_multiplexer: StreamClientMultiplexHandler @@ -56,8 +56,6 @@ def __init__( self._client = client self._stream_multiplexer = stream_multiplexer - self._destroyed = False - def invoke(self, invocation: RpcInvocation) -> Result: call_type: CallType = invocation.get_attribute(common_constants.CALL_KEY) result = TriResult(call_type) @@ -202,10 +200,6 @@ def get_url(self) -> URL: def is_available(self) -> bool: return self._client.is_connected() - @property - def destroyed(self) -> bool: - return self._destroyed - def destroy(self) -> None: self._client.close() self._client = None diff --git a/dubbo/protocol/triple/protocol.py b/dubbo/protocol/triple/protocol.py index 102b552..b9896c2 100644 --- a/dubbo/protocol/triple/protocol.py +++ b/dubbo/protocol/triple/protocol.py @@ -61,11 +61,13 @@ def export(self, url: URL): if self._server is not None: return - service_handler: RpcServiceHandler = url.attributes[ - common_constants.SERVICE_HANDLER_KEY - ] + service_handler = url.attributes[common_constants.SERVICE_HANDLER_KEY] - self._path_resolver[service_handler.service_name] = service_handler + if iter(service_handler): + for handler in service_handler: + self._path_resolver[handler.service_name] = handler + else: + self._path_resolver[service_handler.service_name] = service_handler method_executor = ThreadPoolExecutor( thread_name_prefix=f"dubbo_tri_method_{str(uuid.uuid4())}", max_workers=10 diff --git a/dubbo/registry/protocol.py b/dubbo/registry/protocol.py index 2a13764..ebee40f 100644 --- a/dubbo/registry/protocol.py +++ b/dubbo/registry/protocol.py @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dubbo.cluster import Directory from dubbo.cluster.directories import RegistryDirectory from dubbo.cluster.failfast_cluster import FailfastCluster +from dubbo.cluster.monitor.cpu import CpuMonitor, CpuInnerRpcHandler from dubbo.configs import RegistryConfig from dubbo.constants import common_constants from dubbo.extension import extensionLoader @@ -44,8 +44,15 @@ def export(self, url: URL): # get the server registry registry = self._factory.get_registry(url) - ref_url = url.attributes[common_constants.EXPORT_KEY] + ref_url: URL = url.attributes[common_constants.EXPORT_KEY] registry.register(ref_url) + + # add cpu handler + ref_url.attributes[common_constants.SERVICE_HANDLER_KEY] = [ + ref_url.attributes[common_constants.SERVICE_HANDLER_KEY], + CpuInnerRpcHandler.get_service_handler(), + ] + # continue the export process self._protocol.export(ref_url) @@ -53,7 +60,10 @@ def refer(self, url: URL) -> Invoker: registry = self._factory.get_registry(url) # create the directory - directory: Directory = RegistryDirectory(registry, self._protocol, url) + if url.parameters.get(common_constants.LOADBALANCE_KEY) == "cpu": + directory = CpuMonitor(registry, self._protocol, url) + else: + directory = RegistryDirectory(registry, self._protocol, url) # continue the refer process return FailfastCluster().join(directory) diff --git a/dubbo/remoting/aio/http2/controllers.py b/dubbo/remoting/aio/http2/controllers.py index 6642ecf..d52e2df 100644 --- a/dubbo/remoting/aio/http2/controllers.py +++ b/dubbo/remoting/aio/http2/controllers.py @@ -349,6 +349,12 @@ def write_rst(self, frame: UserActionFrames) -> None: """ def _inner_operation(_frame: UserActionFrames): + + # -1 means the stream is not created, so we don't need to send the reset frame + if self._stream.id == -1: + return + + _frame.stream_id = self._stream.id self._protocol.send_frame(_frame, self._stream) self._stream.close_local() @@ -376,6 +382,7 @@ async def _run(self) -> None: # wait and send the data frames while True: frame = await self._data_queue.get() + frame.stream_id = self._stream.id if frame is not FrameOutboundController.LAST_DATA_FRAME: self._data_sent_event = asyncio.Event() self._protocol.send_frame(frame, self._stream, self._data_sent_event) @@ -388,6 +395,7 @@ async def _run(self) -> None: # wait for the last data frame and send the trailers frame await self._data_sent_event.wait() + self._trailers.stream_id = self._stream.id self._protocol.send_frame(self._trailers, self._stream) # close the stream diff --git a/dubbo/remoting/aio/http2/frames.py b/dubbo/remoting/aio/http2/frames.py index 8809f8d..b216b4d 100644 --- a/dubbo/remoting/aio/http2/frames.py +++ b/dubbo/remoting/aio/http2/frames.py @@ -24,7 +24,7 @@ "HeadersFrame", "DataFrame", "WindowUpdateFrame", - "ResetStreamFrame", + "RstStreamFrame", "PingFrame", "UserActionFrames", ] @@ -147,7 +147,7 @@ def __repr__(self) -> str: return f"" -class ResetStreamFrame(Http2Frame): +class RstStreamFrame(Http2Frame): """ HTTP/2 reset stream frame. """ @@ -170,7 +170,9 @@ def __init__( self.error_code = error_code def __repr__(self) -> str: - return f"" + return ( + f"" + ) class PingFrame(Http2Frame): @@ -178,9 +180,9 @@ class PingFrame(Http2Frame): HTTP/2 ping frame. """ - __slots__ = ["data"] + __slots__ = ["data", "ack"] - def __init__(self, data: bytes): + def __init__(self, data: bytes, ack: bool = False): """ Initialize the HTTP/2 ping frame. :param data: The data. @@ -188,10 +190,11 @@ def __init__(self, data: bytes): """ super().__init__(0, Http2FrameType.PING, False) self.data = data + self.ack = ack def __repr__(self) -> str: return f"" # User action frames. -UserActionFrames = Union[HeadersFrame, DataFrame, ResetStreamFrame] +UserActionFrames = Union[HeadersFrame, DataFrame, RstStreamFrame] diff --git a/dubbo/remoting/aio/http2/protocol.py b/dubbo/remoting/aio/http2/protocol.py index fa96523..89ff26f 100644 --- a/dubbo/remoting/aio/http2/protocol.py +++ b/dubbo/remoting/aio/http2/protocol.py @@ -17,7 +17,7 @@ import asyncio import struct import time -from typing import List, Optional, Tuple +from typing import Optional from h2.config import H2Configuration from h2.connection import H2Connection @@ -32,7 +32,7 @@ HeadersFrame, Http2Frame, PingFrame, - ResetStreamFrame, + RstStreamFrame, UserActionFrames, WindowUpdateFrame, ) @@ -159,9 +159,7 @@ def send_frame( """ frame_type = frame.frame_type if frame_type == Http2FrameType.HEADERS: - self._send_headers_frame( - frame.stream_id, frame.headers.to_list(), frame.end_stream, event - ) + self._send_headers_frame(frame, stream, event) elif frame_type == Http2FrameType.DATA: self._flow_controller.write_data(stream, frame, event) elif frame_type == Http2FrameType.RST_STREAM: @@ -171,22 +169,25 @@ def send_frame( def _send_headers_frame( self, - stream_id: int, - headers: List[Tuple[str, str]], - end_stream: bool, + frame: HeadersFrame, + stream: Http2Stream, event: Optional[asyncio.Event] = None, ) -> None: """ Send the HTTP/2 headers frame.(thread-unsafe) - :param stream_id: The stream identifier. - :type stream_id: int - :param headers: The headers. - :type headers: List[Tuple[str, str]] - :param end_stream: Whether the stream is ended. - :type end_stream: bool + :param frame: The frame to send. + :type frame: HeadersFrame + :param stream: The stream. + :type stream: Http2Stream :param event: The event to be set after sending the frame. """ - self._h2_connection.send_headers(stream_id, headers, end_stream=end_stream) + if stream.id == -1: + stream.id = self._h2_connection.get_next_available_stream_id() + self._stream_handler.put_stream(stream.id, stream) + + self._h2_connection.send_headers( + stream.id, frame.headers.to_list(), end_stream=frame.end_stream + ) self._flush() EventHelper.set(event) @@ -248,7 +249,7 @@ def data_received(self, data): if isinstance(frame, WindowUpdateFrame): # Because flow control may be at the connection level, it is handled here self._flow_controller.release_flow_control(frame) - elif isinstance(frame, (HeadersFrame, DataFrame, ResetStreamFrame)): + elif isinstance(frame, (HeadersFrame, DataFrame, RstStreamFrame)): # Handle the frame by the stream handler self._stream_handler.handle_frame(frame) else: @@ -331,7 +332,7 @@ def connection_made(self, transport: asyncio.Transport): def _do_other_frame(self, frame: Http2Frame): # Handle the ping frame - if isinstance(frame, PingFrame): + if isinstance(frame, PingFrame) and frame.ack: FutureHelper.set_result(self._ping_ack_future, None) async def _heartbeat_loop(self): diff --git a/dubbo/remoting/aio/http2/stream.py b/dubbo/remoting/aio/http2/stream.py index e610d7c..c072a95 100644 --- a/dubbo/remoting/aio/http2/stream.py +++ b/dubbo/remoting/aio/http2/stream.py @@ -23,7 +23,7 @@ from dubbo.remoting.aio.http2.frames import ( DataFrame, HeadersFrame, - ResetStreamFrame, + RstStreamFrame, UserActionFrames, ) from dubbo.remoting.aio.http2.headers import Http2Headers @@ -58,6 +58,13 @@ def id(self) -> int: """ return self._id + @id.setter + def id(self, stream_id: int) -> None: + """ + Set the stream identifier. + """ + self._id = stream_id + @property def listener(self) -> "Http2Stream.Listener": """ @@ -261,8 +268,8 @@ def cancel_by_local(self, error_code: Http2ErrorCode) -> None: if self.local_closed: # The stream has been closed locally. return - reset_frame = ResetStreamFrame(self.id, error_code) - self._outbound_controller.write_rst(reset_frame) + rst_frame = RstStreamFrame(self.id, error_code) + self._outbound_controller.write_rst(rst_frame) def receive_frame(self, frame: UserActionFrames) -> None: """ diff --git a/dubbo/remoting/aio/http2/stream_handler.py b/dubbo/remoting/aio/http2/stream_handler.py index 65ec7bd..73d2d4f 100644 --- a/dubbo/remoting/aio/http2/stream_handler.py +++ b/dubbo/remoting/aio/http2/stream_handler.py @@ -16,12 +16,10 @@ import asyncio import uuid -from concurrent import futures from concurrent.futures import ThreadPoolExecutor from typing import Callable, Dict, Optional from dubbo.loggers import loggerFactory -from dubbo.remoting.aio.exceptions import ProtocolError from dubbo.remoting.aio.http2.frames import UserActionFrames from dubbo.remoting.aio.http2.registries import Http2FrameType from dubbo.remoting.aio.http2.stream import DefaultHttp2Stream, Http2Stream @@ -107,6 +105,9 @@ def handle_frame(self, frame: UserActionFrames) -> None: # It must be ensured that the event loop is not blocked, # and if there is a blocking operation, the executor must be used. stream.receive_frame(frame) + + if frame.end_stream and stream.local_closed: + self.remove_stream(frame.stream_id) else: _LOGGER.warning( f"Stream {frame.stream_id} not found. Ignoring frame {frame}" @@ -134,19 +135,9 @@ def create(self, listener: Http2Stream.Listener) -> DefaultHttp2Stream: :return: The stream. :rtype: DefaultHttp2Stream """ - future = futures.Future() - self._protocol.get_next_stream_id(future) - try: - # block until the stream_id is created - stream_id = future.result() - new_stream = DefaultHttp2Stream( - stream_id, listener, self._loop, self._protocol, self._executor - ) - self.put_stream(stream_id, new_stream) - except Exception as e: - raise ProtocolError("Failed to create stream.") from e - - return new_stream + return DefaultHttp2Stream( + -1, listener, self._loop, self._protocol, self._executor + ) class StreamServerMultiplexHandler(StreamMultiplexHandler): diff --git a/dubbo/remoting/aio/http2/utils.py b/dubbo/remoting/aio/http2/utils.py index 7cc4f66..32d52f0 100644 --- a/dubbo/remoting/aio/http2/utils.py +++ b/dubbo/remoting/aio/http2/utils.py @@ -22,7 +22,7 @@ DataFrame, HeadersFrame, PingFrame, - ResetStreamFrame, + RstStreamFrame, WindowUpdateFrame, ) from dubbo.remoting.aio.http2.headers import Http2Headers @@ -40,14 +40,14 @@ class Http2EventUtils: def convert_to_frame( event: h2_event.Event, ) -> Union[ - HeadersFrame, DataFrame, ResetStreamFrame, WindowUpdateFrame, PingFrame, None + HeadersFrame, DataFrame, RstStreamFrame, WindowUpdateFrame, PingFrame, None ]: """ Convert a h2.events.Event to HTTP/2 Frame. :param event: The H2 event. :type event: h2.events.Event :return: The HTTP/2 frame. - :rtype: Union[HeadersFrame, DataFrame, ResetStreamFrame, WindowUpdateFrame, PingFrame, None] + :rtype: Union[HeadersFrame, DataFrame, RstStreamFrame, WindowUpdateFrame, PingFrame, None] """ if isinstance( event, @@ -73,14 +73,14 @@ def convert_to_frame( ) elif isinstance(event, h2_event.StreamReset): # RST_STREAM frame. - return ResetStreamFrame( - event.stream_id, Http2ErrorCode.get(event.error_code) - ) + return RstStreamFrame(event.stream_id, Http2ErrorCode.get(event.error_code)) elif isinstance(event, h2_event.WindowUpdated): # WINDOW_UPDATE frame. return WindowUpdateFrame(event.stream_id, event.delta) - elif isinstance(event, h2_event.PingReceived): + elif isinstance(event, (h2_event.PingAckReceived, h2_event.PingReceived)): # PING frame. - return PingFrame(event.ping_data) + return PingFrame( + event.ping_data, ack=isinstance(event, h2_event.PingAckReceived) + ) return None diff --git a/dubbo/utils.py b/dubbo/utils.py index 47b404f..6900237 100644 --- a/dubbo/utils.py +++ b/dubbo/utils.py @@ -16,7 +16,11 @@ import socket -__all__ = ["EventHelper", "FutureHelper", "NetworkUtils"] +__all__ = ["EventHelper", "FutureHelper", "NetworkUtils", "CpuUtils"] + +from typing import List, Tuple + +import psutil class EventHelper: @@ -155,3 +159,71 @@ def get_host_ip(): :rtype: str """ return socket.gethostbyname(NetworkUtils.get_host_name()) + + +class CpuUtils: + """ + Helper class for CPU operations. + """ + + @staticmethod + def get_cpu_count(logical=True) -> int: + """ + Get the number of CPUs in the system. + + :return: The number of CPUs in the system. + :rtype: int + """ + return psutil.cpu_count(logical=logical) + + @staticmethod + def get_total_cpu_usage(interval=1) -> float: + """ + Get the total CPU usage of the system. + + :param interval: The interval in seconds. + :type interval: int + :return: The total CPU usage of the system. + :rtype: float + """ + return psutil.cpu_percent(interval=interval) + + @staticmethod + def get_per_cpu_usage(interval=1) -> List[float]: + """ + Get the per CPU usage of the system. + + :param interval: The interval in seconds. + :type interval: int + :return: The per CPU usage of the system. + :rtype: list + """ + return psutil.cpu_percent(interval=interval, percpu=True) + + @staticmethod + def get_load_avg() -> Tuple[float, float, float]: + """ + Get the load average over the last 1, 5, and 15 minutes + + :return: The load average of the system. + :rtype: list + """ + return psutil.getloadavg() + + @staticmethod + def get_cpu_stats(): + """ + Get the CPU stats of the system. + + :return: The CPU stats of the system. + """ + return psutil.cpu_stats() + + @staticmethod + def get_cpu_freq(): + """ + Get the current CPU frequency. + + :return: The current CPU frequency. + """ + return psutil.cpu_freq() diff --git a/requirements.txt b/requirements.txt index dd0cffb..89dbdc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ h2>=4.1.0 uvloop>=0.19.0 -kazoo>=2.10.0 \ No newline at end of file +kazoo>=2.10.0 +psutil>=6.0.0 \ No newline at end of file diff --git a/samples/registry/zookeeper/client.py b/samples/registry/zookeeper/client.py index 9c84db0..ff81ba7 100644 --- a/samples/registry/zookeeper/client.py +++ b/samples/registry/zookeeper/client.py @@ -13,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import time + import unary_unary_pb2 import dubbo @@ -33,10 +35,14 @@ def unary(self, request): if __name__ == "__main__": - registry_config = RegistryConfig.from_url("zookeeper://127.0.0.1:2181") + registry_config = RegistryConfig.from_url( + "zookeeper://127.0.0.1:2181" + ) bootstrap = dubbo.Dubbo(registry_config=registry_config) - reference_config = ReferenceConfig(protocol="tri", service="org.apache.dubbo.samples.registry.zk") + reference_config = ReferenceConfig( + protocol="tri", service="org.apache.dubbo.samples.registry.zk" + ) dubbo_client = bootstrap.create_client(reference_config) unary_service_stub = UnaryServiceStub(dubbo_client) diff --git a/samples/stream/server_stream/client.py b/samples/stream/server_stream/client.py index fa9d4c1..4b9cb39 100644 --- a/samples/stream/server_stream/client.py +++ b/samples/stream/server_stream/client.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import unary_stream_pb2 -from setuptools.extern import names import dubbo from dubbo.configs import ReferenceConfig diff --git a/setup.py b/setup.py index edb5703..4b91932 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,10 @@ packages=find_packages(include=("dubbo", "dubbo.*")), test_suite="tests", python_requires=">=3.11", - install_requires=["h2>=4.1.0", "uvloop>=0.19.0; platform_system!='Windows'"], + install_requires=[ + "h2>=4.1.0", + "uvloop>=0.19.0; platform_system!='Windows'", + "psutil>=6.0.0", + ], extras_require={"zookeeper": ["kazoo>=2.10.0"]}, )