From 3338e37d9cce388b7123661c8f4d15b4f4a6ab1e Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Wed, 28 Jan 2026 10:19:23 +0800 Subject: [PATCH 01/26] Renamed 'test_yuanrong_storage_manager.py' to 'test_yuanrong_storage_client.py' Signed-off-by: dpj135 <958208521@qq.com> --- ...uanrong_storage_manager.py => test_yuanrong_storage_client.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_yuanrong_storage_manager.py => test_yuanrong_storage_client.py} (100%) diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_client.py similarity index 100% rename from tests/test_yuanrong_storage_manager.py rename to tests/test_yuanrong_storage_client.py From 146cbd2e812eca33a43bb739e53a164b8407bb80 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Wed, 28 Jan 2026 10:30:39 +0800 Subject: [PATCH 02/26] Added abstract interface 'StorageStrategy' Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index b408b93..9118d2d 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -17,6 +17,7 @@ import os import pickle import struct +from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from typing import Any, Optional, TypeAlias @@ -428,3 +429,26 @@ def clear(self, keys: list[str]): keys (List[str]): List of keys to remove. """ self._batch_clear(keys) + + +class StorageStrategy(ABC): + @abstractmethod + def init(self, value: Any) -> "StorageStrategy": ... + + @abstractmethod + def supports_to_put(self, value: Any) -> bool: ... + + @abstractmethod + def put(self, keys: list[str], values: list[Any]): ... + + @abstractmethod + def supports_to_get(self, value: Any) -> bool: ... + + @abstractmethod + def get(self, keys: list[str], meta: list[dict]) -> list[Optional[Any]]: ... + + @abstractmethod + def supports_to_clear(self, value: Any) -> bool: ... + + @abstractmethod + def clear(self, keys: list[str]): ... From 4d184b85e77eba6e6903e806bb6e7a330f319455 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Wed, 28 Jan 2026 13:11:56 +0800 Subject: [PATCH 03/26] Added DsTensorClient Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 115 ++++++++++++++++-- 1 file changed, 108 insertions(+), 7 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 9118d2d..14c459d 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -19,7 +19,7 @@ import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Any, Optional, TypeAlias +from typing import Any, Optional, TypeAlias, Union import torch from torch import Tensor @@ -36,7 +36,6 @@ NPU_DS_CLIENT_KEYS_LIMIT: int = 9999 CPU_DS_CLIENT_KEYS_LIMIT: int = 1999 YUANRONG_DATASYSTEM_IMPORTED: bool = True -TORCH_NPU_IMPORTED: bool = True DS_MAX_WORKERS: int = 16 try: from yr import datasystem @@ -433,22 +432,124 @@ def clear(self, keys: list[str]): class StorageStrategy(ABC): @abstractmethod - def init(self, value: Any) -> "StorageStrategy": ... + @staticmethod + def init(config: dict) -> Union["StorageStrategy", None]: ... @abstractmethod - def supports_to_put(self, value: Any) -> bool: ... + def custom_meta(self) -> Any: ... + + @abstractmethod + def supports_put(self, value: Any) -> bool: ... @abstractmethod def put(self, keys: list[str], values: list[Any]): ... @abstractmethod - def supports_to_get(self, value: Any) -> bool: ... + def supports_get(self, custom_meta: Any) -> bool: ... @abstractmethod - def get(self, keys: list[str], meta: list[dict]) -> list[Optional[Any]]: ... + def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: ... @abstractmethod - def supports_to_clear(self, value: Any) -> bool: ... + def supports_clear(self, custom_meta: Any) -> bool: ... @abstractmethod def clear(self, keys: list[str]): ... + + +class DsTensorClientAdapter(StorageStrategy): + KEYS_LIMIT: int = 10_000 + + def __init__(self, config: dict): + host = config.get("host") + port = config.get("port") + + self.device_id = torch.npu.current_device() + torch.npu.set_device(self.device_id) + + self._ds_client = datasystem.DsTensorClient(host, port, self.device_id) + self._ds_client.init() + + @staticmethod + def init(config: dict) -> Union["StorageStrategy", None]: + torch_npu_imported: bool = True + try: + import torch_npu # noqa: F401 + except ImportError: + torch_npu_imported = False + enable = config.get("enable_yr_npu_optimization", True) + if not (enable and torch_npu_imported and torch.npu.is_available()): + return None + logger.info("YuanrongStorageClient: Create DsTensorClient to connect with yuanrong-datasystem backend!") + return DsTensorClientAdapter(config) + + def custom_meta(self) -> Any: + return "DsTensorClient" + + def supports_put(self, value: Any) -> bool: + if not (isinstance(value, torch.Tensor) and value.device.type == "npu"): + return False + # Todo(dpj): perhaps KVClient can process uncontiguous tensor + if not value.is_contiguous(): + raise ValueError(f"NPU Tensor is not contiguous: {value}") + return True + + def put(self, keys: list[str], values: list[Any]): + # _npu_ds_client.dev_mset doesn't support to overwrite + for i in range(0, len(keys), self.KEYS_LIMIT): + batch_keys = keys[i : i + self.KEYS_LIMIT] + batch_values = values[i : i + self.KEYS_LIMIT] + try: + self._ds_client.dev_delete(batch_keys) + except Exception: + pass + self._ds_client.dev_mset(batch_keys, batch_values) + + def supports_get(self, custom_meta: str) -> bool: + return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + + def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: + # Fetch NPU tensors + shapes = kwargs.get("shapes", None) + dtypes = kwargs.get("dtypes", None) + if not shapes or not dtypes: + raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") + results = [] + for i in range(0, len(keys), self.KEYS_LIMIT): + batch_keys = keys[i : i + self.KEYS_LIMIT] + batch_shapes = shapes[i : i + self.KEYS_LIMIT] + batch_dtypes = dtypes[i : i + self.KEYS_LIMIT] + + batch_values = self._create_empty_npu_tensorlist(batch_shapes, batch_dtypes) + self._ds_client.dev_mget(batch_keys, batch_values) + # Todo(dpj): should we check failed keys? + # failed_keys = self._ds_client.dev_mget(batch_keys, batch_values) + # if failed_keys: + # logging.warning(f"YuanrongStorageClient: Querying keys using 'DsTensorClient' failed: {failed_keys}") + results.extend(batch_values) + return results + + def supports_clear(self, custom_meta: str) -> bool: + return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + + def clear(self, keys: list[str]): + for i in range(0, len(keys), self.KEYS_LIMIT): + batch = keys[i : i + self.KEYS_LIMIT] + # Todo(dpj): Test call clear when no (key,value) put in ds + self._ds_client.dev_delete(batch) + + def _create_empty_npu_tensorlist(self, shapes, dtypes): + """ + Create a list of empty NPU tensors with given shapes and dtypes. + + Args: + shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) + dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) + Returns: + list: List of uninitialized NPU tensors + """ + tensors: list[Tensor] = [] + for shape, dtype in zip(shapes, dtypes, strict=True): + tensor = torch.empty(shape, dtype=dtype, device=f"npu:{self.device_id}") + tensors.append(tensor) + return tensors From d5ec72412abc81c54e7dbe040a5987bc80a39eb8 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Wed, 28 Jan 2026 16:32:21 +0800 Subject: [PATCH 04/26] Added 'KVClientAdapter' Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 495 ++++++------------ 1 file changed, 161 insertions(+), 334 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 14c459d..faeb09e 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -15,7 +15,6 @@ import logging import os -import pickle import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor @@ -33,87 +32,13 @@ logger = logging.getLogger(__name__) logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) -NPU_DS_CLIENT_KEYS_LIMIT: int = 9999 -CPU_DS_CLIENT_KEYS_LIMIT: int = 1999 YUANRONG_DATASYSTEM_IMPORTED: bool = True -DS_MAX_WORKERS: int = 16 + try: from yr import datasystem except ImportError: YUANRONG_DATASYSTEM_IMPORTED = False -# Header: number of entries (uint32, little-endian) -HEADER_FMT = " int: - """ - Calculate the total size (in bytes) required to pack a list of memoryview items - into the structured binary format used by pack_into. - - Args: - items: List of memoryview objects to be packed. - - Returns: - Total buffer size in bytes. - """ - return HEADER_SIZE + len(items) * ENTRY_SIZE + sum(item.nbytes for item in items) - - -def pack_into(target: memoryview, items: list[memoryview]): - """ - Pack multiple contiguous buffers into a single buffer. - ┌───────────────┐ - │ item_count │ uint32 - ├───────────────┤ - │ entries │ N * item entries - ├───────────────┤ - │ payload blob │ N * concatenated buffers - └───────────────┘ - - Args: - target (memoryview): A writable memoryview returned by StateValueBuffer.MutableData(). - It must be large enough to accommodate the total number of bytes of HEADER + ENTRY_TABLE + all items. - This buffer is usually mapped to shared memory or Zero-Copy memory area. - items (List[memoryview]): List of read-only memory views (e.g., from serialized objects). Each item must support - the buffer protocol and be readable as raw bytes. - - """ - struct.pack_into(HEADER_FMT, target, 0, len(items)) - - entry_offset = HEADER_SIZE - payload_offset = HEADER_SIZE + len(items) * ENTRY_SIZE - - target_tensor = torch.frombuffer(target, dtype=torch.uint8) - - for item in items: - struct.pack_into(ENTRY_FMT, target, entry_offset, payload_offset, item.nbytes) - src_tensor = torch.frombuffer(item, dtype=torch.uint8) - target_tensor[payload_offset : payload_offset + item.nbytes].copy_(src_tensor) - entry_offset += ENTRY_SIZE - payload_offset += item.nbytes - - -def unpack_from(source: memoryview) -> list[bytestr]: - """ - Unpack multiple contiguous buffers from a single packed buffer. - Args: - source (memoryview): The packed source buffer. - Returns: - list[bytestr]: List of unpacked contiguous buffers. - """ - mv = memoryview(source) - item_count = struct.unpack_from(HEADER_FMT, mv, 0)[0] - offsets = [] - for i in range(item_count): - offset, length = struct.unpack_from(ENTRY_FMT, mv, HEADER_SIZE + i * ENTRY_SIZE) - offsets.append((offset, length)) - return [mv[offset : offset + length] for offset, length in offsets] - @StorageClientFactory.register("YuanrongStorageClient") class YuanrongStorageClient(TransferQueueStorageKVClient): @@ -129,145 +54,7 @@ def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") - global TORCH_NPU_IMPORTED - try: - import torch_npu # noqa: F401 - except ImportError: - TORCH_NPU_IMPORTED = False - - self.host = config.get("host") - self.port = config.get("port") - - self.device_id = None - self._npu_ds_client = None - self._cpu_ds_client = None - - if not TORCH_NPU_IMPORTED: - logger.warning( - "'torch_npu' import failed. " - "It results in the inability to quickly put/get tensors on the NPU side, which may affect performance." - ) - elif not torch.npu.is_available(): - logger.warning( - "NPU is not available. " - "It results in the inability to quickly put/get tensors on the NPU side, which may affect performance." - ) - else: - self.device_id = torch.npu.current_device() - self._npu_ds_client = datasystem.DsTensorClient(self.host, self.port, self.device_id) - self._npu_ds_client.init() - - self._cpu_ds_client = datasystem.KVClient(self.host, self.port) - self._cpu_ds_client.init() - - def npu_ds_client_is_available(self): - """Check if NPU client is available.""" - return self._npu_ds_client is not None - def cpu_ds_client_is_available(self): - """Check if CPU client is available.""" - return self._cpu_ds_client is not None - - def _create_empty_npu_tensorlist(self, shapes, dtypes): - """ - Create a list of empty NPU tensors with given shapes and dtypes. - - Args: - shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) - dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) - Returns: - list: List of uninitialized NPU tensors - """ - tensors: list[Tensor] = [] - for shape, dtype in zip(shapes, dtypes, strict=False): - tensor = torch.empty(shape, dtype=dtype, device=f"npu:{self.device_id}") - tensors.append(tensor) - return tensors - - def mset_zcopy(self, keys: list[str], objs: list[Any]): - """Store multiple objects in zero-copy mode using parallel serialization and buffer packing. - - Args: - keys (list[str]): List of string keys under which the objects will be stored. - objs (list[Any]): List of Python objects to store (e.g., tensors, strings). - """ - assert self._cpu_ds_client is not None, "CPU DS client is not available" - items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs] - packed_sizes = [calc_packed_size(items) for items in items_list] - buffers = self._cpu_ds_client.mcreate(keys, packed_sizes) - tasks = [(target.MutableData(), item) for target, item in zip(buffers, items_list, strict=False)] - with ThreadPoolExecutor(max_workers=DS_MAX_WORKERS) as executor: - list(executor.map(lambda p: pack_into(*p), tasks)) - self._cpu_ds_client.mset_buffer(buffers) - - def mget_zcopy(self, keys: list[str]) -> list[Any]: - """Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers. - - Args: - keys (list[str]): List of string keys to retrieve from storage. - - Returns: - list[Any]: List of deserialized objects corresponding to the input keys. - """ - assert self._cpu_ds_client is not None, "CPU DS client is not available" - buffers = self._cpu_ds_client.get_buffers(keys) - return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers] - - def _batch_put(self, keys: list[str], values: list[Any]): - """Stores a batch of key-value pairs to remote storage, splitting by device type. - - NPU tensors are sent via DsTensorClient (with higher batch limit), - while all other objects are pickled and sent via KVClient. - - Args: - keys (List[str]): List of string keys. - values (List[Any]): Corresponding values (tensors or general objects). - """ - if self.npu_ds_client_is_available(): - # Classify NPU and CPU data - npu_keys = [] - npu_values = [] - - cpu_keys = [] - cpu_values = [] - - for key, value in zip(keys, values, strict=True): - if isinstance(value, torch.Tensor) and value.device.type == "npu": - if not value.is_contiguous(): - raise ValueError(f"NPU Tensor is not contiguous: {value}") - npu_keys.append(key) - npu_values.append(value) - - else: - cpu_keys.append(key) - cpu_values.append(pickle.dumps(value)) - - # put NPU data - assert self._npu_ds_client is not None, "NPU DS client is not available" - for i in range(0, len(npu_keys), NPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = npu_keys[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - batch_values = npu_values[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - - # _npu_ds_client.dev_mset doesn't support to overwrite - try: - self._npu_ds_client.dev_delete(batch_keys) - except Exception as e: - logger.warning(f"dev_delete error({e}) before dev_mset") - self._npu_ds_client.dev_mset(batch_keys, batch_values) - - # put CPU data - assert self._cpu_ds_client is not None, "CPU DS client is not available" - for i in range(0, len(cpu_keys), CPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = cpu_keys[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - batch_values = cpu_values[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - self.mset_zcopy(batch_keys, batch_values) - - else: - # All data goes through CPU path - for i in range(0, len(keys), CPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = keys[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - batch_vals = values[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - self.mset_zcopy(batch_keys, batch_vals) def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: """Stores multiple key-value pairs to remote storage. @@ -283,96 +70,7 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: raise ValueError("keys and values must be lists") if len(keys) != len(values): raise ValueError("Number of keys must match number of values") - self._batch_put(keys, values) - return None - - def _batch_get(self, keys: list[str], shapes: list, dtypes: list) -> list[Any]: - """Retrieves a batch of values from remote storage using expected metadata. - - NPU tensors are fetched via DsTensorClient using pre-allocated buffers. - Other objects are fetched via KVClient and unpickled. - - Args: - keys (List[str]): Keys to fetch. - shapes (List[List[int]]): Expected shapes for each key (empty list for scalars). - dtypes (List[Optional[torch.dtype]]): Expected dtypes; None indicates non-tensor data. - - Returns: - List[Any]: Retrieved values in the same order as input keys. - """ - - if self.npu_ds_client_is_available(): - # classify npu and cpu queries - npu_indices = [] - npu_keys = [] - npu_shapes = [] - npu_dtypes = [] - - cpu_indices = [] - cpu_keys = [] - - for idx, (key, shape, dtype) in enumerate(zip(keys, shapes, dtypes, strict=False)): - if dtype is not None: - npu_indices.append(idx) - npu_keys.append(key) - npu_shapes.append(shape) - npu_dtypes.append(dtype) - else: - cpu_indices.append(idx) - cpu_keys.append(key) - - results = [None] * len(keys) - - # Fetch NPU tensors - assert self._npu_ds_client is not None, "NPU DS client is not available" - for i in range(0, len(npu_keys), NPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = npu_keys[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - batch_shapes = npu_shapes[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - batch_dtypes = npu_dtypes[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - batch_indices = npu_indices[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - - batch_values = self._create_empty_npu_tensorlist(batch_shapes, batch_dtypes) - failed_subkeys = [] - try: - failed_subkeys = self._npu_ds_client.dev_mget(batch_keys, batch_values) - # failed_keys = f'{key},{npu_device_id}' - failed_subkeys = [f_key.rsplit(",", 1)[0] for f_key in failed_subkeys] - except Exception: - failed_subkeys = batch_keys - - # Fill successfully retrieved tensors - failed_set = set(failed_subkeys) - for idx, key, value in zip(batch_indices, batch_keys, batch_values, strict=False): - if key not in failed_set: - results[idx] = value - - # Add failed keys to CPU fallback queue - if failed_subkeys: - cpu_keys.extend(failed_subkeys) - cpu_indices.extend([batch_indices[j] for j, k in enumerate(batch_keys) if k in failed_set]) - - # Fetch CPU/general objects (including NPU fallbacks) - assert self._cpu_ds_client is not None, "CPU DS client is not available" - for i in range(0, len(cpu_keys), CPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = cpu_keys[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - batch_indices = cpu_indices[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - objects = self.mget_zcopy(batch_keys) - for idx, obj in zip(batch_indices, objects, strict=False): - results[idx] = obj - - return results - - else: - results = [None] * len(keys) - cpu_indices = list(range(len(keys))) - - for i in range(0, len(keys), CPU_DS_CLIENT_KEYS_LIMIT): - batch_keys = keys[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - batch_indices = cpu_indices[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - objects = self.mget_zcopy(batch_keys) - for idx, obj in zip(batch_indices, objects, strict=False): - results[idx] = obj - return results + pass def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: """Retrieves multiple values from remote storage with expected metadata. @@ -392,34 +90,7 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") if not (len(keys) == len(shapes) == len(dtypes)): raise ValueError("Lengths of keys, shapes, dtypes must match") - return self._batch_get(keys, shapes, dtypes) - - def _batch_clear(self, keys: list[str]): - """Deletes a batch of keys from remote storage. - - Attempts deletion via NPU client first (if available), then falls back to CPU client - for any keys not handled by NPU. - - Args: - keys (List[str]): Keys to delete. - """ - if self.npu_ds_client_is_available(): - assert self._npu_ds_client is not None, "NPU DS client is not available" - assert self._cpu_ds_client is not None, "CPU DS client is not available" - # Try to delete all keys via npu client - for i in range(0, len(keys), NPU_DS_CLIENT_KEYS_LIMIT): - batch = keys[i : i + NPU_DS_CLIENT_KEYS_LIMIT] - # Return the keys that failed to delete - self._npu_ds_client.dev_delete(batch) - # Delete failed keys via CPU client - for j in range(0, len(keys), CPU_DS_CLIENT_KEYS_LIMIT): - sub_batch = keys[j : j + CPU_DS_CLIENT_KEYS_LIMIT] - self._cpu_ds_client.delete(sub_batch) - else: - assert self._cpu_ds_client is not None, "CPU DS client is not available" - for i in range(0, len(keys), CPU_DS_CLIENT_KEYS_LIMIT): - batch = keys[i : i + CPU_DS_CLIENT_KEYS_LIMIT] - self._cpu_ds_client.delete(batch) + pass def clear(self, keys: list[str]): """Deletes multiple keys from remote storage. @@ -427,7 +98,7 @@ def clear(self, keys: list[str]): Args: keys (List[str]): List of keys to remove. """ - self._batch_clear(keys) + pass class StorageStrategy(ABC): @@ -469,6 +140,7 @@ def __init__(self, config: dict): self._ds_client = datasystem.DsTensorClient(host, port, self.device_id) self._ds_client.init() + logger.info("YuanrongStorageClient: Create DsTensorClient to connect with yuanrong-datasystem backend!") @staticmethod def init(config: dict) -> Union["StorageStrategy", None]: @@ -480,7 +152,7 @@ def init(config: dict) -> Union["StorageStrategy", None]: enable = config.get("enable_yr_npu_optimization", True) if not (enable and torch_npu_imported and torch.npu.is_available()): return None - logger.info("YuanrongStorageClient: Create DsTensorClient to connect with yuanrong-datasystem backend!") + return DsTensorClientAdapter(config) def custom_meta(self) -> Any: @@ -553,3 +225,158 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): tensor = torch.empty(shape, dtype=dtype, device=f"npu:{self.device_id}") tensors.append(tensor) return tensors + + +class KVClientAdapter(StorageStrategy): + PUT_KEYS_LIMIT: int = 2_000 + GET_CLEAR_KEYS_LIMIT: int = 10_000 + + # Header: number of entries (uint32, little-endian) + HEADER_FMT = " Union["StorageStrategy", None]: + return KVClientAdapter(config) + + def custom_meta(self) -> Any: + return "KVClient" + + def supports_put(self, value: Any) -> bool: + return True + + def put(self, keys: list[str], values: list[Any]): + for i in range(0, len(keys), self.PUT_KEYS_LIMIT): + batch_keys = keys[i : i + self.PUT_KEYS_LIMIT] + batch_vals = values[i : i + self.PUT_KEYS_LIMIT] + self.mset_zero_copy(batch_keys, batch_vals) + + def supports_get(self, custom_meta: str) -> bool: + return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + + def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: + results = [] + for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): + batch_keys = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] + objects = self.mget_zero_copy(batch_keys) + results.extend(objects) + return results + + def supports_clear(self, custom_meta: str) -> bool: + return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + + # Todo(wenlin): Add clear_buffer method + def clear(self, keys: list[str]): + pass + # for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): + # batch = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] + # self._ds_client.delete(batch) + + @staticmethod + def calc_packed_size(items: list[memoryview]) -> int: + """ + Calculate the total size (in bytes) required to pack a list of memoryview items + into the structured binary format used by pack_into. + + Args: + items: List of memoryview objects to be packed. + + Returns: + Total buffer size in bytes. + """ + return ( + KVClientAdapter.HEADER_SIZE + len(items) * KVClientAdapter.ENTRY_SIZE + sum(item.nbytes for item in items) + ) + + @staticmethod + def pack_into(target: memoryview, items: list[memoryview]): + """ + Pack multiple contiguous buffers into a single buffer. + ┌───────────────┐ + │ item_count │ uint32 + ├───────────────┤ + │ entries │ N * item entries + ├───────────────┤ + │ payload blob │ N * concatenated buffers + └───────────────┘ + + Args: + target (memoryview): A writable memoryview returned by StateValueBuffer.MutableData(). + It must be large enough to accommodate the total number of bytes of HEADER + ENTRY_TABLE + all items. + This buffer is usually mapped to shared memory or Zero-Copy memory area. + items (List[memoryview]): List of read-only memory views (e.g., from serialized objects). + Each item must support the buffer protocol and be readable as raw bytes. + + """ + struct.pack_into(KVClientAdapter.HEADER_FMT, target, 0, len(items)) + + entry_offset = KVClientAdapter.HEADER_SIZE + payload_offset = KVClientAdapter.HEADER_SIZE + len(items) * KVClientAdapter.ENTRY_SIZE + + target_tensor = torch.frombuffer(target, dtype=torch.uint8) + + for item in items: + struct.pack_into(KVClientAdapter.ENTRY_FMT, target, entry_offset, payload_offset, item.nbytes) + src_tensor = torch.frombuffer(item, dtype=torch.uint8) + target_tensor[payload_offset : payload_offset + item.nbytes].copy_(src_tensor) + entry_offset += KVClientAdapter.ENTRY_SIZE + payload_offset += item.nbytes + + @staticmethod + def unpack_from(source: memoryview) -> list[memoryview]: + """ + Unpack multiple contiguous buffers from a single packed buffer. + Args: + source (memoryview): The packed source buffer. + Returns: + list[]: List of unpacked contiguous buffers. + """ + mv = memoryview(source) + item_count = struct.unpack_from(KVClientAdapter.HEADER_FMT, mv, 0)[0] + offsets = [] + for i in range(item_count): + offset, length = struct.unpack_from( + KVClientAdapter.ENTRY_FMT, mv, KVClientAdapter.HEADER_SIZE + i * KVClientAdapter.ENTRY_SIZE + ) + offsets.append((offset, length)) + return [mv[offset : offset + length] for offset, length in offsets] + + def mset_zero_copy(self, keys: list[str], objs: list[Any]): + """Store multiple objects in zero-copy mode using parallel serialization and buffer packing. + + Args: + keys (list[str]): List of string keys under which the objects will be stored. + objs (list[Any]): List of Python objects to store (e.g., tensors, strings). + """ + items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs] + packed_sizes = [self.calc_packed_size(items) for items in items_list] + buffers = self._ds_client.mcreate(keys, packed_sizes) + tasks = [(target.MutableData(), item) for target, item in zip(buffers, items_list, strict=True)] + with ThreadPoolExecutor(max_workers=self.DS_MAX_WORKERS) as executor: + list(executor.map(lambda p: self.pack_into(*p), tasks)) + self._ds_client.mset_buffer(buffers) + + def mget_zero_copy(self, keys: list[str]) -> list[Any]: + """Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers. + + Args: + keys (list[str]): List of string keys to retrieve from storage. + + Returns: + list[Any]: List of deserialized objects corresponding to the input keys. + """ + buffers = self._ds_client.get_buffers(keys) + return [_decoder.decode(self.unpack_from(buffer)) if buffer is not None else None for buffer in buffers] From 28320da2385f4a42fb23f75f43d45d15ac0c2c4c Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Wed, 28 Jan 2026 17:31:28 +0800 Subject: [PATCH 05/26] Refactored 'YuanrongStorageClient.put&get' Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 64 ++++++++++++++++++- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index faeb09e..6ec0165 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -54,7 +54,14 @@ def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") + self._strategies: list[StorageStrategy] = [] + for strategy_cls in [DsTensorClientAdapter, KVClientAdapter]: + strategy = strategy_cls.init(config) + if strategy is not None: + self._strategies.append(strategy) + if not self._strategies: + raise RuntimeError("No storage strategy available for YuanrongStorageClient") def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: """Stores multiple key-value pairs to remote storage. @@ -70,7 +77,24 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: raise ValueError("keys and values must be lists") if len(keys) != len(values): raise ValueError("Number of keys must match number of values") - pass + custom_metas = [] + strategy_batches: dict[StorageStrategy, tuple[list[str], list[Any]]] = {s: ([], []) for s in self._strategies} + + for key, value in zip(keys, values, strict=True): + for strategy in self._strategies: + if strategy.supports_put(value): + custom_metas.append(strategy.custom_meta()) + strategy_batches[strategy][0].append(key) + strategy_batches[strategy][1].append(value) + break + else: + raise ValueError(f"No strategy supports putting value of type {type(value)}") + # Todo: Parallel put + for strategy, (s_keys, s_vals) in strategy_batches.items(): + if s_keys: + strategy.put(s_keys, s_vals) + + return custom_metas def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: """Retrieves multiple values from remote storage with expected metadata. @@ -90,7 +114,41 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") if not (len(keys) == len(shapes) == len(dtypes)): raise ValueError("Lengths of keys, shapes, dtypes must match") - pass + + if custom_meta is None: + raise ValueError("custom_meta is required for YuanrongStorageClient.get()") + + if len(custom_meta) != len(keys): + raise ValueError("custom_meta length must match keys") + + results: list[Optional[Any]] = [None] * len(keys) + + # {strategy: ([index], [key])} + strategy_batches: dict[StorageStrategy, tuple[list[int], list[str]]] = {s: ([], []) for s in self._strategies} + + for i, (key, meta) in enumerate(zip(keys, custom_meta, strict=True)): + for strategy in self._strategies: + if strategy.supports_get(meta): + strategy_batches[strategy][0].append(i) + strategy_batches[strategy][1].append(key) + break + else: + raise ValueError(f"No strategy supports getting with meta={meta}") + # Todo: Parallel get + for strategy, (indices, s_keys) in strategy_batches.items(): + s_shapes = [shapes[i] for i in indices] + s_dtypes = [dtypes[i] for i in indices] + + try: + s_results = strategy.get(s_keys, shapes=s_shapes, dtypes=s_dtypes) + except Exception as e: + logger.error(f"Strategy {strategy.custom_meta()} failed to get keys: {s_keys}, error: {e}") + raise + + for idx, res in zip(indices, s_results, strict=True): + results[idx] = res + + return results def clear(self, keys: list[str]): """Deletes multiple keys from remote storage. @@ -342,7 +400,7 @@ def unpack_from(source: memoryview) -> list[memoryview]: Args: source (memoryview): The packed source buffer. Returns: - list[]: List of unpacked contiguous buffers. + list[memoryview]: List of unpacked contiguous buffers. """ mv = memoryview(source) item_count = struct.unpack_from(KVClientAdapter.HEADER_FMT, mv, 0)[0] From 8c8e9a2172b2ad6a73746620455da406424abcd8 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Thu, 29 Jan 2026 11:21:54 +0800 Subject: [PATCH 06/26] Added 'route_to_strategy' to class 'YuanrongStorageClient' & Adjust the order of classes Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 260 +++++++++--------- 1 file changed, 135 insertions(+), 125 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 6ec0165..e9b01a9 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -18,7 +18,7 @@ import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Any, Optional, TypeAlias, Union +from typing import Any, Callable, Optional, TypeAlias, Union import torch from torch import Tensor @@ -40,125 +40,6 @@ YUANRONG_DATASYSTEM_IMPORTED = False -@StorageClientFactory.register("YuanrongStorageClient") -class YuanrongStorageClient(TransferQueueStorageKVClient): - """ - Storage client for YuanRong DataSystem. - - Supports storing and fetching both: - - NPU tensors via DsTensorClient (for high performance). - - General objects (CPU tensors, str, bool, list, etc.) via KVClient with pickle serialization. - """ - - def __init__(self, config: dict[str, Any]): - if not YUANRONG_DATASYSTEM_IMPORTED: - raise ImportError("YuanRong DataSystem not installed.") - - self._strategies: list[StorageStrategy] = [] - for strategy_cls in [DsTensorClientAdapter, KVClientAdapter]: - strategy = strategy_cls.init(config) - if strategy is not None: - self._strategies.append(strategy) - - if not self._strategies: - raise RuntimeError("No storage strategy available for YuanrongStorageClient") - - def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: - """Stores multiple key-value pairs to remote storage. - - Automatically routes NPU tensors to high-performance tensor storage, - and other objects to general-purpose KV storage. - - Args: - keys (List[str]): List of unique string identifiers. - values (List[Any]): List of values to store (tensors, scalars, dicts, etc.). - """ - if not isinstance(keys, list) or not isinstance(values, list): - raise ValueError("keys and values must be lists") - if len(keys) != len(values): - raise ValueError("Number of keys must match number of values") - custom_metas = [] - strategy_batches: dict[StorageStrategy, tuple[list[str], list[Any]]] = {s: ([], []) for s in self._strategies} - - for key, value in zip(keys, values, strict=True): - for strategy in self._strategies: - if strategy.supports_put(value): - custom_metas.append(strategy.custom_meta()) - strategy_batches[strategy][0].append(key) - strategy_batches[strategy][1].append(value) - break - else: - raise ValueError(f"No strategy supports putting value of type {type(value)}") - # Todo: Parallel put - for strategy, (s_keys, s_vals) in strategy_batches.items(): - if s_keys: - strategy.put(s_keys, s_vals) - - return custom_metas - - def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: - """Retrieves multiple values from remote storage with expected metadata. - - Requires shape and dtype hints to reconstruct NPU tensors correctly. - - Args: - keys (List[str]): Keys to fetch. - shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). - dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. - custom_backend_meta (List[str], optional): Device type (npu/cpu) for each key - - Returns: - List[Any]: Retrieved values in the same order as input keys. - """ - if shapes is None or dtypes is None: - raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") - if not (len(keys) == len(shapes) == len(dtypes)): - raise ValueError("Lengths of keys, shapes, dtypes must match") - - if custom_meta is None: - raise ValueError("custom_meta is required for YuanrongStorageClient.get()") - - if len(custom_meta) != len(keys): - raise ValueError("custom_meta length must match keys") - - results: list[Optional[Any]] = [None] * len(keys) - - # {strategy: ([index], [key])} - strategy_batches: dict[StorageStrategy, tuple[list[int], list[str]]] = {s: ([], []) for s in self._strategies} - - for i, (key, meta) in enumerate(zip(keys, custom_meta, strict=True)): - for strategy in self._strategies: - if strategy.supports_get(meta): - strategy_batches[strategy][0].append(i) - strategy_batches[strategy][1].append(key) - break - else: - raise ValueError(f"No strategy supports getting with meta={meta}") - # Todo: Parallel get - for strategy, (indices, s_keys) in strategy_batches.items(): - s_shapes = [shapes[i] for i in indices] - s_dtypes = [dtypes[i] for i in indices] - - try: - s_results = strategy.get(s_keys, shapes=s_shapes, dtypes=s_dtypes) - except Exception as e: - logger.error(f"Strategy {strategy.custom_meta()} failed to get keys: {s_keys}, error: {e}") - raise - - for idx, res in zip(indices, s_results, strict=True): - results[idx] = res - - return results - - def clear(self, keys: list[str]): - """Deletes multiple keys from remote storage. - - Args: - keys (List[str]): List of keys to remove. - """ - pass - - class StorageStrategy(ABC): @abstractmethod @staticmethod @@ -336,12 +217,10 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: def supports_clear(self, custom_meta: str) -> bool: return isinstance(custom_meta, str) and custom_meta == self.custom_meta() - # Todo(wenlin): Add clear_buffer method def clear(self, keys: list[str]): - pass - # for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): - # batch = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] - # self._ds_client.delete(batch) + for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): + batch = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] + self._ds_client.delete(batch) @staticmethod def calc_packed_size(items: list[memoryview]) -> int: @@ -438,3 +317,134 @@ def mget_zero_copy(self, keys: list[str]) -> list[Any]: """ buffers = self._ds_client.get_buffers(keys) return [_decoder.decode(self.unpack_from(buffer)) if buffer is not None else None for buffer in buffers] + + +@StorageClientFactory.register("YuanrongStorageClient") +class YuanrongStorageClient(TransferQueueStorageKVClient): + """ + Storage client for YuanRong DataSystem. + + Supports storing and fetching both: + - NPU tensors via DsTensorClient (for high performance). + - General objects (CPU tensors, str, bool, list, etc.) via KVClient with pickle serialization. + """ + + def __init__(self, config: dict[str, Any]): + if not YUANRONG_DATASYSTEM_IMPORTED: + raise ImportError("YuanRong DataSystem not installed.") + + self._strategies: list[StorageStrategy] = [] + for strategy_cls in [DsTensorClientAdapter, KVClientAdapter]: + strategy = strategy_cls.init(config) + if strategy is not None: + self._strategies.append(strategy) + + if not self._strategies: + raise RuntimeError("No storage strategy available for YuanrongStorageClient") + + def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: + """Stores multiple key-value pairs to remote storage. + + Automatically routes NPU tensors to high-performance tensor storage, + and other objects to general-purpose KV storage. + + Args: + keys (List[str]): List of unique string identifiers. + values (List[Any]): List of values to store (tensors, scalars, dicts, etc.). + + Returns: + List[Any]: custom metadata of YuanrongStorageCilent in the same order as input keys. + """ + if not isinstance(keys, list) or not isinstance(values, list): + raise ValueError("keys and values must be lists") + if len(keys) != len(values): + raise ValueError("Number of keys must match number of values") + + routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_)) + custom_metas = [None] * len(keys) + for strategy, indexes in routed_indexes.items(): + if not indexes: + continue + strategy_keys = [keys[i] for i in indexes] + strategy_values = [values[i] for i in indexes] + strategy.put(strategy_keys, strategy_values) + for i in indexes: + custom_metas[i] = strategy.custom_meta() + + return custom_metas + + def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]: + """Retrieves multiple values from remote storage with expected metadata. + + Requires shape and dtype hints to reconstruct NPU tensors correctly. + + Args: + keys (List[str]): Keys to fetch. + shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). + dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. + custom_meta (List[str], optional): Device type (npu/cpu) for each key + + Returns: + List[Any]: Retrieved values in the same order as input keys. + """ + if shapes is None or dtypes is None: + raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") + if not (len(keys) == len(shapes) == len(dtypes)): + raise ValueError("Lengths of keys, shapes, dtypes must match") + + if custom_meta is None: + raise ValueError("custom_meta is required for YuanrongStorageClient.get()") + + if len(custom_meta) != len(keys): + raise ValueError("custom_meta length must match keys") + + routed_indexes = self._route_to_strategies(custom_meta, lambda strategy_, item_: strategy_.supports_get(item_)) + + # Todo(dpj): Parallel get + results = [None] * len(keys) + for strategy, indexes in routed_indexes.items(): + if not indexes: + continue + strategy_keys = [keys[i] for i in indexes] + strategy_shapes = [shapes[i] for i in indexes] + strategy_dtypes = [dtypes[i] for i in indexes] + strategy_results = strategy.get(strategy_keys, shapes=strategy_shapes, dtypes=strategy_dtypes) + for j, i in enumerate(indexes): + results[i] = strategy_results[j] + + return results + + def clear(self, keys: list[str]): + """Deletes multiple keys from remote storage. + + Args: + keys (List[str]): List of keys to remove. + """ + pass + + def _route_to_strategies( + self, + items: list[Any], + selector: Callable[[StorageStrategy, Any], bool], + ) -> dict[StorageStrategy, list[int]]: + """ + Groups item indices by storage strategy. + + Args: + items: A list of items (e.g., values or custom_meta strings) to be dispatched. + The order must correspond to the original keys. + selector: A function that determines whether a strategy supports an item. + Signature: (strategy, item) -> bool + + Returns: + A dictionary mapping each strategy to a list of indices in `items`. + """ + routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies} + for i, item in enumerate(items): + for strategy in self._strategies: + if selector(strategy, item): + routed_indexes[strategy].append(i) + break + else: + raise ValueError(f"No strategy supports item: {item}") + return routed_indexes From 4d8ae6428af27093f9a389fa81a48fcf67a6c424 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Thu, 29 Jan 2026 15:04:13 +0800 Subject: [PATCH 07/26] Fixed the order about '@staticmethod' and 'abstractmethod' (Now yuanrong_client can execute correctly) Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/yuanrong_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index e9b01a9..d16500b 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -41,8 +41,8 @@ class StorageStrategy(ABC): - @abstractmethod @staticmethod + @abstractmethod def init(config: dict) -> Union["StorageStrategy", None]: ... @abstractmethod From a9235e6a44be104d19dfa06a02c2e86443ae76ea Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Thu, 29 Jan 2026 15:57:52 +0800 Subject: [PATCH 08/26] Added custom_meta to clear for all TransferQueueKVStorageClient Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/base.py | 2 +- .../storage/clients/mooncake_client.py | 7 +++- .../storage/clients/ray_storage_client.py | 3 +- .../storage/clients/yuanrong_client.py | 34 ++++++++++++++----- transfer_queue/storage/managers/base.py | 5 ++- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/transfer_queue/storage/clients/base.py b/transfer_queue/storage/clients/base.py index 90c33fe..435b7b3 100644 --- a/transfer_queue/storage/clients/base.py +++ b/transfer_queue/storage/clients/base.py @@ -65,6 +65,6 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise NotImplementedError("Subclasses must implement get") @abstractmethod - def clear(self, keys: list[str]) -> None: + def clear(self, keys: list[str], custom_meta: Optional[list[Any]] = None) -> None: """Clear key-value pairs in the storage backend.""" raise NotImplementedError("Subclasses must implement clear") diff --git a/transfer_queue/storage/clients/mooncake_client.py b/transfer_queue/storage/clients/mooncake_client.py index 6b730ca..5966eab 100644 --- a/transfer_queue/storage/clients/mooncake_client.py +++ b/transfer_queue/storage/clients/mooncake_client.py @@ -139,7 +139,11 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non keys (List[str]): Keys to fetch. shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. +<<<<<<< HEAD custom_backend_meta (List[str], optional): Device type (npu/cpu) for each key +======= + custom_meta (List[Any], optional): ... +>>>>>>> acd7686 (Added custom_meta to clear for all TransferQueueKVStorageClient) Returns: List[Any]: Retrieved values in the same order as input keys. @@ -216,11 +220,12 @@ def _batch_get_bytes(self, keys: list[str]) -> list[bytes]: results.extend(batch_results) return results - def clear(self, keys: list[str]): + def clear(self, keys: list[str], custom_meta=None): """Deletes multiple keys from MooncakeStore. Args: keys (List[str]): List of keys to remove. + custom_meta (List[Any], optional): ... """ for key in keys: ret = self._store.remove(key) diff --git a/transfer_queue/storage/clients/ray_storage_client.py b/transfer_queue/storage/clients/ray_storage_client.py index 8bd4468..d515a99 100644 --- a/transfer_queue/storage/clients/ray_storage_client.py +++ b/transfer_queue/storage/clients/ray_storage_client.py @@ -106,10 +106,11 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise RuntimeError(f"Failed to retrieve value for key '{keys}': {e}") from e return values - def clear(self, keys: list[str]): + def clear(self, keys: list[str], custom_meta=None): """ Delete entries from storage by keys. Args: keys (list): List of keys to delete + custom_meta (List[Any], optional): ... """ ray.get(self.storage_actor.clear_obj_ref.remote(keys)) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index d16500b..83cb9cf 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -219,8 +219,8 @@ def supports_clear(self, custom_meta: str) -> bool: def clear(self, keys: list[str]): for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): - batch = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] - self._ds_client.delete(batch) + batch_keys = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] + self._ds_client.delete(batch_keys) @staticmethod def calc_packed_size(items: list[memoryview]) -> int: @@ -342,7 +342,7 @@ def __init__(self, config: dict[str, Any]): if not self._strategies: raise RuntimeError("No storage strategy available for YuanrongStorageClient") - def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: + def put(self, keys: list[str], values: list[Any]) -> list[str]: """Stores multiple key-value pairs to remote storage. Automatically routes NPU tensors to high-performance tensor storage, @@ -353,7 +353,7 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: values (List[Any]): List of values to store (tensors, scalars, dicts, etc.). Returns: - List[Any]: custom metadata of YuanrongStorageCilent in the same order as input keys. + List[str]: custom metadata of YuanrongStorageCilent in the same order as input keys. """ if not isinstance(keys, list) or not isinstance(values, list): raise ValueError("keys and values must be lists") @@ -361,7 +361,9 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: raise ValueError("Number of keys must match number of values") routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_)) - custom_metas = [None] * len(keys) + custom_metas: list[str] = [""] * len(keys) + + # Todo(dpj): Parallel put for strategy, indexes in routed_indexes.items(): if not indexes: continue @@ -382,7 +384,7 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li keys (List[str]): Keys to fetch. shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. - custom_meta (List[str], optional): Device type (npu/cpu) for each key + custom_meta (List[str]): StorageStrategy type for each key Returns: List[Any]: Retrieved values in the same order as input keys. @@ -414,13 +416,29 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li return results - def clear(self, keys: list[str]): + def clear(self, keys: list[str], custom_meta=None): """Deletes multiple keys from remote storage. Args: keys (List[str]): List of keys to remove. + custom_meta (List[str]): StorageStrategy type for each key """ - pass + if not isinstance(keys, list): + raise ValueError("keys must be a list") + if not isinstance(custom_meta, list): + raise ValueError("custom_meta must be a list if provided") + if len(custom_meta) != len(keys): + raise ValueError("custom_meta length must match keys") + + routed_indexes = self._route_to_strategies( + custom_meta, lambda strategy_, item_: strategy_.supports_clear(item_) + ) + # Todo(dpj): Parallel clear + for strategy, indexes in routed_indexes.items(): + if not indexes: + continue + strategy_keys = [keys[i] for i in indexes] + strategy.clear(strategy_keys) def _route_to_strategies( self, diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 2c76e91..9fb99ac 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -555,6 +555,8 @@ async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None: keys = self._generate_keys(data.keys(), metadata.global_indexes) values = self._generate_values(data) loop = asyncio.get_event_loop() + + # put to storage backends custom_meta = await loop.run_in_executor(None, self.storage_client.put, keys, values) per_field_dtypes: dict[int, dict[str, Any]] = {} @@ -632,4 +634,5 @@ async def clear_data(self, metadata: BatchMeta) -> None: logger.warning("Attempted to clear data, but metadata contains no fields.") return keys = self._generate_keys(metadata.field_names, metadata.global_indexes) - self.storage_client.clear(keys=keys) + _, _, custom_meta = self._get_shape_type_custom_meta_list(metadata) + self.storage_client.clear(keys=keys, custom_meta=custom_meta) From ffa597024601b11ba79fbbd150cc641c50ce8daf Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 13:02:23 +0800 Subject: [PATCH 09/26] Added multi-threads optimization to 'put/get/clear' of 'YuanrongStorageClient' Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 90 +++++++++++++------ 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 83cb9cf..54c8357 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -361,19 +361,21 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]: raise ValueError("Number of keys must match number of values") routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_)) - custom_metas: list[str] = [""] * len(keys) - - # Todo(dpj): Parallel put - for strategy, indexes in routed_indexes.items(): - if not indexes: - continue - strategy_keys = [keys[i] for i in indexes] - strategy_values = [values[i] for i in indexes] - strategy.put(strategy_keys, strategy_values) - for i in indexes: - custom_metas[i] = strategy.custom_meta() - return custom_metas + # Define the work unit: Slicing the input list and calling the backend strategy. + # The closure captures local 'keys' and 'values' for zero-overhead parameter passing. + def put_task(strategy, indexes): + strategy.put([keys[i] for i in indexes], [values[i] for i in indexes]) + return strategy.custom_meta(), indexes + + # Call the orchestrator to run the tasks (Parallel or Sequential). + # We then iterate through the results to map strategy-specific metadata back + # to the original global index order. + custom_meta: list[str] = [""] * len(keys) + for meta_str, indexes in self._dispatch_tasks(routed_indexes, put_task): + for i in indexes: + custom_meta[i] = meta_str + return custom_meta def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]: """Retrieves multiple values from remote storage with expected metadata. @@ -402,18 +404,20 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li routed_indexes = self._route_to_strategies(custom_meta, lambda strategy_, item_: strategy_.supports_get(item_)) - # Todo(dpj): Parallel get - results = [None] * len(keys) - for strategy, indexes in routed_indexes.items(): - if not indexes: - continue - strategy_keys = [keys[i] for i in indexes] - strategy_shapes = [shapes[i] for i in indexes] - strategy_dtypes = [dtypes[i] for i in indexes] - strategy_results = strategy.get(strategy_keys, shapes=strategy_shapes, dtypes=strategy_dtypes) - for j, i in enumerate(indexes): - results[i] = strategy_results[j] + # Work unit for 'get': handles slicing of keys, shapes, and dtypes simultaneously. + def get_task(strategy, indexes): + res = strategy.get( + [keys[i] for i in indexes], shapes=[shapes[i] for i in indexes], dtypes=[dtypes[i] for i in indexes] + ) + return res, indexes + # Dispatch the 'get' requests. Multiple backends (e.g. NPU and SSD) will fetch + # in parallel if needed. Results are merged back into the 'results' list + # according to their original positions. + results = [None] * len(keys) + for strategy_res, indexes in self._dispatch_tasks(routed_indexes, get_task): + for value, original_index in zip(strategy_res, indexes, strict=True): + results[original_index] = value return results def clear(self, keys: list[str], custom_meta=None): @@ -433,12 +437,14 @@ def clear(self, keys: list[str], custom_meta=None): routed_indexes = self._route_to_strategies( custom_meta, lambda strategy_, item_: strategy_.supports_clear(item_) ) - # Todo(dpj): Parallel clear - for strategy, indexes in routed_indexes.items(): - if not indexes: - continue - strategy_keys = [keys[i] for i in indexes] - strategy.clear(strategy_keys) + + # Cleanup work unit: Does not return values, just executes deletion. + def clear_task(strategy, indexes): + strategy.clear([keys[i] for i in indexes]) + + # Parallelize deletion across strategies. + # The 'with' context in _dispatch_tasks will wait for all deletions to finish. + self._dispatch_tasks(routed_indexes, clear_task) def _route_to_strategies( self, @@ -466,3 +472,29 @@ def _route_to_strategies( else: raise ValueError(f"No strategy supports item: {item}") return routed_indexes + + def _dispatch_tasks(self, routed_tasks: dict[StorageStrategy, list[int]], task_function: Callable) -> list[Any]: + """ + Orchestrates task execution across multiple strategies. + + Logic: + 1. If no tasks are present, return immediately. + 2. If only one strategy is active, execute synchronously in the main thread (Fast Path) + to avoid the overhead of thread creation and context switching. + 3. If multiple strategies are active, execute in parallel using a ThreadPoolExecutor. + """ + active_tasks = [(strategy, indexes) for strategy, indexes in routed_tasks.items() if indexes] + + if not active_tasks: + return [] + + # Fast Path: Execute directly if only one backend is targeted. + # This significantly reduces latency for homogeneous batches (e.g., NPU-only). + if len(active_tasks) == 1: + return [task_function(*active_tasks[0])] + + # Parallel Path: Maximize throughput by overlapping NPU and CPU operations. + # The 'with' statement ensures immediate thread teardown and resource release. + with ThreadPoolExecutor(max_workers=len(active_tasks)) as executor: + futures = [executor.submit(task_function, strategy, indexes) for strategy, indexes in active_tasks] + return [f.result() for f in futures] From 39c72bb0fd1180df5645d15f3e148915db1b71d1 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 15:09:48 +0800 Subject: [PATCH 10/26] Added more annotation for methods Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 124 +++++++++++------- 1 file changed, 74 insertions(+), 50 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 54c8357..64261d1 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -43,31 +43,41 @@ class StorageStrategy(ABC): @staticmethod @abstractmethod - def init(config: dict) -> Union["StorageStrategy", None]: ... + def init(config: dict) -> Union["StorageStrategy", None]: + """Initialize strategy from config; return None if not applicable.""" @abstractmethod - def custom_meta(self) -> Any: ... + def custom_meta(self) -> Any: + """Return metadata identifying this strategy (e.g., string name).""" @abstractmethod - def supports_put(self, value: Any) -> bool: ... + def supports_put(self, value: Any) -> bool: + """Check if this strategy can store the given value.""" @abstractmethod - def put(self, keys: list[str], values: list[Any]): ... + def put(self, keys: list[str], values: list[Any]): + """Store key-value pairs using this strategy.""" @abstractmethod - def supports_get(self, custom_meta: Any) -> bool: ... + def supports_get(self, custom_meta: Any) -> bool: + """Check if this strategy can retrieve data with given metadata.""" @abstractmethod - def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: ... + def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: + """Retrieve values by keys; kwargs may include shapes/dtypes.""" @abstractmethod - def supports_clear(self, custom_meta: Any) -> bool: ... + def supports_clear(self, custom_meta: Any) -> bool: + """Check if this strategy owns data identified by metadata.""" @abstractmethod - def clear(self, keys: list[str]): ... + def clear(self, keys: list[str]): + """Delete keys from storage.""" class DsTensorClientAdapter(StorageStrategy): + """Adapter for YuanRong's high-performance NPU tensor storage.""" + KEYS_LIMIT: int = 10_000 def __init__(self, config: dict): @@ -83,6 +93,7 @@ def __init__(self, config: dict): @staticmethod def init(config: dict) -> Union["StorageStrategy", None]: + """Initialize only if NPU and torch_npu are available.""" torch_npu_imported: bool = True try: import torch_npu # noqa: F401 @@ -95,9 +106,11 @@ def init(config: dict) -> Union["StorageStrategy", None]: return DsTensorClientAdapter(config) def custom_meta(self) -> Any: + """Metadata tag for NPU tensor storage.""" return "DsTensorClient" def supports_put(self, value: Any) -> bool: + """Supports contiguous NPU tensors only.""" if not (isinstance(value, torch.Tensor) and value.device.type == "npu"): return False # Todo(dpj): perhaps KVClient can process uncontiguous tensor @@ -106,10 +119,11 @@ def supports_put(self, value: Any) -> bool: return True def put(self, keys: list[str], values: list[Any]): - # _npu_ds_client.dev_mset doesn't support to overwrite + """Store NPU tensors in batches; deletes before overwrite.""" for i in range(0, len(keys), self.KEYS_LIMIT): batch_keys = keys[i : i + self.KEYS_LIMIT] batch_values = values[i : i + self.KEYS_LIMIT] + # _npu_ds_client.dev_mset doesn't support to overwrite try: self._ds_client.dev_delete(batch_keys) except Exception: @@ -117,10 +131,11 @@ def put(self, keys: list[str], values: list[Any]): self._ds_client.dev_mset(batch_keys, batch_values) def supports_get(self, custom_meta: str) -> bool: + """Matches 'DsTensorClient' custom metadata.""" return isinstance(custom_meta, str) and custom_meta == self.custom_meta() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: - # Fetch NPU tensors + """Fetch NPU tensors using pre-allocated empty buffers.""" shapes = kwargs.get("shapes", None) dtypes = kwargs.get("dtypes", None) if not shapes or not dtypes: @@ -141,9 +156,11 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: return results def supports_clear(self, custom_meta: str) -> bool: + """Matches 'DsTensorClient' metadata.""" return isinstance(custom_meta, str) and custom_meta == self.custom_meta() def clear(self, keys: list[str]): + """Delete NPU tensor keys in batches.""" for i in range(0, len(keys), self.KEYS_LIMIT): batch = keys[i : i + self.KEYS_LIMIT] # Todo(dpj): Test call clear when no (key,value) put in ds @@ -167,6 +184,11 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): class KVClientAdapter(StorageStrategy): + """ + Adapter for general-purpose KV storage with serialization. + The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'. + """ + PUT_KEYS_LIMIT: int = 2_000 GET_CLEAR_KEYS_LIMIT: int = 10_000 @@ -189,24 +211,30 @@ def __init__(self, config: dict): @staticmethod def init(config: dict) -> Union["StorageStrategy", None]: + """Always enabled for general objects.""" return KVClientAdapter(config) def custom_meta(self) -> Any: + """Metadata tag for general KV storage.""" return "KVClient" def supports_put(self, value: Any) -> bool: + """Accepts any Python object.""" return True def put(self, keys: list[str], values: list[Any]): + """Store objects via zero-copy serialization in batches.""" for i in range(0, len(keys), self.PUT_KEYS_LIMIT): batch_keys = keys[i : i + self.PUT_KEYS_LIMIT] batch_vals = values[i : i + self.PUT_KEYS_LIMIT] self.mset_zero_copy(batch_keys, batch_vals) def supports_get(self, custom_meta: str) -> bool: + """Matches 'KVClient' metadata.""" return isinstance(custom_meta, str) and custom_meta == self.custom_meta() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: + """Retrieve and deserialize objects in batches.""" results = [] for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): batch_keys = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] @@ -215,9 +243,11 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: return results def supports_clear(self, custom_meta: str) -> bool: + """Matches 'KVClient' metadata.""" return isinstance(custom_meta, str) and custom_meta == self.custom_meta() def clear(self, keys: list[str]): + """Delete keys in batches.""" for i in range(0, len(keys), self.GET_CLEAR_KEYS_LIMIT): batch_keys = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] self._ds_client.delete(batch_keys) @@ -326,7 +356,7 @@ class YuanrongStorageClient(TransferQueueStorageKVClient): Supports storing and fetching both: - NPU tensors via DsTensorClient (for high performance). - - General objects (CPU tensors, str, bool, list, etc.) via KVClient with pickle serialization. + - General objects (CPU tensors, str, bool, list, etc.) via KVClient with serialization. """ def __init__(self, config: dict[str, Any]): @@ -368,9 +398,7 @@ def put_task(strategy, indexes): strategy.put([keys[i] for i in indexes], [values[i] for i in indexes]) return strategy.custom_meta(), indexes - # Call the orchestrator to run the tasks (Parallel or Sequential). - # We then iterate through the results to map strategy-specific metadata back - # to the original global index order. + # Dispatch tasks and map metadata back to original positions custom_meta: list[str] = [""] * len(keys) for meta_str, indexes in self._dispatch_tasks(routed_indexes, put_task): for i in indexes: @@ -391,16 +419,11 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li Returns: List[Any]: Retrieved values in the same order as input keys. """ - if shapes is None or dtypes is None: - raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") - if not (len(keys) == len(shapes) == len(dtypes)): - raise ValueError("Lengths of keys, shapes, dtypes must match") + if shapes is None or dtypes is None or custom_meta is None: + raise ValueError("YuanrongStorageClient.get() needs Expected shapes, dtypes and custom_meta") - if custom_meta is None: - raise ValueError("custom_meta is required for YuanrongStorageClient.get()") - - if len(custom_meta) != len(keys): - raise ValueError("custom_meta length must match keys") + if not (len(keys) == len(shapes) == len(dtypes) == len(custom_meta)): + raise ValueError("Lengths of keys, shapes, dtypes, custom_meta must match") routed_indexes = self._route_to_strategies(custom_meta, lambda strategy_, item_: strategy_.supports_get(item_)) @@ -411,9 +434,7 @@ def get_task(strategy, indexes): ) return res, indexes - # Dispatch the 'get' requests. Multiple backends (e.g. NPU and SSD) will fetch - # in parallel if needed. Results are merged back into the 'results' list - # according to their original positions. + # Gather results and restore original order results = [None] * len(keys) for strategy_res, indexes in self._dispatch_tasks(routed_indexes, get_task): for value, original_index in zip(strategy_res, indexes, strict=True): @@ -427,10 +448,9 @@ def clear(self, keys: list[str], custom_meta=None): keys (List[str]): List of keys to remove. custom_meta (List[str]): StorageStrategy type for each key """ - if not isinstance(keys, list): - raise ValueError("keys must be a list") - if not isinstance(custom_meta, list): - raise ValueError("custom_meta must be a list if provided") + if not isinstance(keys, list) or not isinstance(custom_meta, list): + raise ValueError("keys and custom_meta must be a list") + if len(custom_meta) != len(keys): raise ValueError("custom_meta length must match keys") @@ -438,12 +458,10 @@ def clear(self, keys: list[str], custom_meta=None): custom_meta, lambda strategy_, item_: strategy_.supports_clear(item_) ) - # Cleanup work unit: Does not return values, just executes deletion. def clear_task(strategy, indexes): strategy.clear([keys[i] for i in indexes]) - # Parallelize deletion across strategies. - # The 'with' context in _dispatch_tasks will wait for all deletions to finish. + # Execute deletions (no return values needed) self._dispatch_tasks(routed_indexes, clear_task) def _route_to_strategies( @@ -451,17 +469,19 @@ def _route_to_strategies( items: list[Any], selector: Callable[[StorageStrategy, Any], bool], ) -> dict[StorageStrategy, list[int]]: - """ - Groups item indices by storage strategy. + """Groups item indices by the first strategy that supports them. + + Used to route keys/values/custom_meta to storage backends by grouped indexes. Args: - items: A list of items (e.g., values or custom_meta strings) to be dispatched. + items: A list of items (e.g., values for put, or custom_meta strings for get/clear). The order must correspond to the original keys. selector: A function that determines whether a strategy supports an item. - Signature: (strategy, item) -> bool + Signature: `(strategy: StorageStrategy, item: Any) -> bool`. Returns: - A dictionary mapping each strategy to a list of indices in `items`. + A dictionary mapping each active strategy to a list of indexes in `items` + that it should handle. Every index appears exactly once. """ routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies} for i, item in enumerate(items): @@ -473,28 +493,32 @@ def _route_to_strategies( raise ValueError(f"No strategy supports item: {item}") return routed_indexes - def _dispatch_tasks(self, routed_tasks: dict[StorageStrategy, list[int]], task_function: Callable) -> list[Any]: - """ - Orchestrates task execution across multiple strategies. + @staticmethod + def _dispatch_tasks(routed_tasks: dict[StorageStrategy, list[int]], task_function: Callable) -> list[Any]: + """Executes tasks across one or more storage strategies, optionally in parallel. + + Optimizes for common case: if only one strategy is involved, runs synchronously + to avoid thread overhead. Otherwise, uses thread pool for concurrency. - Logic: - 1. If no tasks are present, return immediately. - 2. If only one strategy is active, execute synchronously in the main thread (Fast Path) - to avoid the overhead of thread creation and context switching. - 3. If multiple strategies are active, execute in parallel using a ThreadPoolExecutor. + Args: + routed_tasks: Mapping from strategy to list of indexes it should process. + task_function: Callable accepting `(strategy, list_of_indexes)` and returning any result. + + Returns: + List of results from `task_function`, one per active strategy, in arbitrary order. + Each result typically includes data and the corresponding indices for reassembly. """ active_tasks = [(strategy, indexes) for strategy, indexes in routed_tasks.items() if indexes] if not active_tasks: return [] - # Fast Path: Execute directly if only one backend is targeted. - # This significantly reduces latency for homogeneous batches (e.g., NPU-only). + # Fast path: single strategy → avoid threading if len(active_tasks) == 1: return [task_function(*active_tasks[0])] - # Parallel Path: Maximize throughput by overlapping NPU and CPU operations. - # The 'with' statement ensures immediate thread teardown and resource release. + # Parallel path: overlap NPU and CPU operations with ThreadPoolExecutor(max_workers=len(active_tasks)) as executor: + # futures' results are from task_function futures = [executor.submit(task_function, strategy, indexes) for strategy, indexes in active_tasks] return [f.result() for f in futures] From ed07b61974e7ac275bf5e84ccf4e376844891a7c Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 16:07:39 +0800 Subject: [PATCH 11/26] Added end-to-end test(generated by AI) for 'YuanrongStorageClient' Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 291 ++++++++++++++++++ ...test_yuanrong_storage_client_zero_copy.py} | 0 2 files changed, 291 insertions(+) create mode 100644 tests/test_yuanrong_storage_client_e2e.py rename tests/{test_yuanrong_storage_client.py => test_yuanrong_storage_client_zero_copy.py} (100%) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py new file mode 100644 index 0000000..35bdd85 --- /dev/null +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -0,0 +1,291 @@ +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import torch +from unittest import mock +from typing import Any, Optional + +from transfer_queue.storage.clients.factory import StorageClientFactory + + +# --- Mock Backend Clients --- +class MockDsTensorClient: + def __init__(self, host, port, device_id): + self.host = host + self.port = port + self.device_id = device_id + self.storage = {} # key -> tensor + + def init(self): + pass + + def dev_mset(self, keys, values): + for k, v in zip(keys, values): + assert v.device.type == "npu" + self.storage[k] = v.clone() # simulate store + + def dev_mget(self, keys, out_tensors): + for i, k in enumerate(keys): + if k in self.storage: + out_tensors[i].copy_(self.storage[k]) + + def dev_delete(self, keys): + for k in keys: + self.storage.pop(k, None) + + +class MockKVClient: + def __init__(self, host, port): + self.host = host + self.port = port + self.storage = {} # key -> bytes + + def init(self): + pass + + def mcreate(self, keys, sizes): + # We don't actually need to return real buffers if we intercept packing + # But to keep interface, return dummy buffers + class MockBuffer: + def __init__(self, size): + self._data = bytearray(size) + self.size = size + def MutableData(self): + return memoryview(self._data) + self._current_keys = keys # HACK: remember keys for mset_buffer + return [MockBuffer(s) for s in sizes] + + def mset_buffer(self, buffers): + from transfer_queue.utils.serial_utils import _decoder, _encoder + # Reconstruct objects from buffers (simulate what pack_into did) + # But we can't easily unpack without knowing original obj... + # Instead, during test, we can assume that whatever was packed is recoverable. + # But for mock, let's just store the raw bytes of the first item in buffer + for key, buf in zip(self._current_keys, buffers): + # Extract the full content of the buffer + raw_bytes = bytes(buf.MutableData()) + self.storage[key] = raw_bytes + del self._current_keys + + def get_buffers(self, keys): + results = [] + for k in keys: + raw = self.storage.get(k) + if raw is None: + results.append(None) + else: + results.append(memoryview(raw)) + return results + + def delete(self, keys): + for k in keys: + self.storage.pop(k, None) + + +# --- Patch utilities --- +def make_mock_datasystem(): + """Returns a mock module that replaces yr.datasystem""" + mock_ds = mock.MagicMock() + mock_ds.DsTensorClient = MockDsTensorClient + mock_ds.KVClient = MockKVClient + return mock_ds + + +# --- Test Fixtures --- +@pytest.fixture +def mock_yr_datasystem(): + with mock.patch.dict("sys.modules", {"yr": mock.MagicMock(), "yr.datasystem": make_mock_datasystem()}): + import yr # noqa + yield + + +@pytest.fixture +def config(): + return { + "host": "127.0.0.1", + "port": 12345, + "enable_yr_npu_optimization": True, + } + + +# --- Helper: Check tensor equality with device awareness --- +def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor): + assert a.shape == b.shape + assert a.dtype == b.dtype + if a.device.type == "npu" or b.device.type == "npu": + a_cpu = a.cpu() + b_cpu = b.cpu() + assert torch.equal(a_cpu, b_cpu) + else: + assert torch.equal(a, b) + + +# --- Main Test Class --- +class TestYuanrongStorageE2E: + + @pytest.fixture(autouse=True) + def setup_client(self, mock_yr_datasystem, config): + # Re-register YuanrongStorageClient in case it was loaded before + from transfer_queue.storage.clients.yuanrong import YuanrongStorageClient # adjust import path as needed + self.client_cls = YuanrongStorageClient + self.config = config + + def _create_test_data_cpu(self): + """Pure CPU data: tensors + primitives""" + keys = ["cpu_tensor", "string", "int_val", "bool_val", "list_val"] + values = [ + torch.randn(3, 4), # CPU tensor + "hello world", + 42, + True, + [1, 2, {"a": 3}], + ] + shapes = [list(v.shape) if isinstance(v, torch.Tensor) else [] for v in values] + dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in values] + return keys, values, shapes, dtypes + + def _create_test_data_npu(self): + """Pure NPU tensors (only if NPU available)""" + if not hasattr(torch, 'npu') or not torch.npu.is_available(): + pytest.skip("NPU not available") + keys = ["npu_tensor1", "npu_tensor2"] + values = [ + torch.randn(2, 3).npu(), + torch.tensor([1, 2, 3], dtype=torch.int64).npu(), + ] + shapes = [list(v.shape) for v in values] + dtypes = [v.dtype for v in values] + return keys, values, shapes, dtypes + + def _create_test_data_mixed(self): + """Mixed NPU + CPU""" + if not hasattr(torch, 'npu') or not torch.npu.is_available(): + pytest.skip("NPU not available") + keys = ["npu_t", "cpu_t", "str_val"] + values = [ + torch.randn(1, 2).npu(), + torch.tensor([5.0]), # CPU + "mixed", + ] + shapes = [list(v.shape) if isinstance(v, torch.Tensor) else [] for v in values] + dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in values] + return keys, values, shapes, dtypes + + def test_put_get_clear_cpu_only(self, config): + client = self.client_cls(config) + keys, values, shapes, dtypes = self._create_test_data_cpu() + + # Put + custom_meta = client.put(keys, values) + assert len(custom_meta) == len(keys) + assert all(cm == "KVClient" for cm in custom_meta) + + # Get + retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + assert len(retrieved) == len(values) + for orig, ret in zip(values, retrieved): + if isinstance(orig, torch.Tensor): + assert_tensors_equal(orig, ret) + else: + assert orig == ret + + # Clear + client.clear(keys, custom_meta=custom_meta) + + # Verify cleared (optional: try get again → should be None or error) + # Since our mock returns zeros for missing NPU keys but None for KV, + # and KV mock returns None for missing keys: + after_clear = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + assert all(v is None for v in after_clear) + # Note: For simplicity, we don't deeply verify deletion; focus on no crash. + + def test_put_get_clear_npu_only(self, config): + if not (hasattr(torch, 'npu') and torch.npu.is_available()): + pytest.skip("NPU not available") + + # Force enable NPU path + config["enable_yr_npu_optimization"] = True + client = self.client_cls(config) + + keys, values, shapes, dtypes = self._create_test_data_npu() + + # Put + custom_meta = client.put(keys, values) + assert all(cm == "DsTensorClient" for cm in custom_meta) + + # Get + retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + for orig, ret in zip(values, retrieved): + assert_tensors_equal(orig, ret) + + # Clear + client.clear(keys, custom_meta=custom_meta) + + def test_put_get_clear_mixed(self, config): + if not (hasattr(torch, 'npu') and torch.npu.is_available()): + pytest.skip("NPU not available") + + config["enable_yr_npu_optimization"] = True + client = self.client_cls(config) + + keys, values, shapes, dtypes = self._create_test_data_mixed() + + # Put + custom_meta = client.put(keys, values) + # Should have both strategies + assert "DsTensorClient" in custom_meta + assert "KVClient" in custom_meta + + # Get + retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + for orig, ret in zip(values, retrieved): + if isinstance(orig, torch.Tensor): + assert_tensors_equal(orig, ret) + else: + assert orig == ret + + # Clear + client.clear(keys, custom_meta=custom_meta) + + def test_without_npu_fallback_to_kv(self, config): + """Ensure NPU tensor is serialized via KVClient when NPU optimization is disabled.""" + if not (hasattr(torch, 'npu') and torch.npu.is_available()): + pytest.skip("NPU not available") + + config = config.copy() + config["enable_yr_npu_optimization"] = False + client = self.client_cls(config) + + keys = ["fallback_tensor"] + values = [torch.randn(2).npu()] # NPU tensor + + # Should go to KVClient + custom_meta = client.put(keys, values) + assert custom_meta == ["KVClient"] + + # Prepare metadata for get + shapes = [list(values[0].shape)] # e.g., [2] + dtypes = [values[0].dtype] # e.g., torch.float32 + + retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + # Should be deserialized as CPU tensor + assert retrieved[0].device.type == "cpu" + assert torch.equal(values[0].cpu(), retrieved[0]) + + # Clear and verify + client.clear(keys, custom_meta=custom_meta) + after_clear = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) + assert after_clear[0] is None \ No newline at end of file diff --git a/tests/test_yuanrong_storage_client.py b/tests/test_yuanrong_storage_client_zero_copy.py similarity index 100% rename from tests/test_yuanrong_storage_client.py rename to tests/test_yuanrong_storage_client_zero_copy.py From 91f2532ce00da16b4fd34aa7bc57bd9c42181791 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 17:26:43 +0800 Subject: [PATCH 12/26] Fixed tests about yuanrong_clint Signed-off-by: dpj135 <958208521@qq.com> --- ...y.py => test_yuanrong_client_zero_copy.py} | 14 +- tests/test_yuanrong_storage_client_e2e.py | 315 ++++++------------ 2 files changed, 113 insertions(+), 216 deletions(-) rename tests/{test_yuanrong_storage_client_zero_copy.py => test_yuanrong_client_zero_copy.py} (87%) diff --git a/tests/test_yuanrong_storage_client_zero_copy.py b/tests/test_yuanrong_client_zero_copy.py similarity index 87% rename from tests/test_yuanrong_storage_client_zero_copy.py rename to tests/test_yuanrong_client_zero_copy.py index 9433d6a..25e5637 100644 --- a/tests/test_yuanrong_storage_client_zero_copy.py +++ b/tests/test_yuanrong_client_zero_copy.py @@ -25,7 +25,7 @@ sys.path.append(str(parent_dir)) from transfer_queue.storage.clients.yuanrong_client import ( # noqa: E402 - YuanrongStorageClient, + KVClientAdapter, ) @@ -37,7 +37,7 @@ def MutableData(self): return self.data -class TestYuanrongStorageZCopy: +class TestYuanrongKVClientZCopy: @pytest.fixture def mock_kv_client(self, mocker): mock_client = MagicMock() @@ -51,7 +51,7 @@ def mock_kv_client(self, mocker): @pytest.fixture def storage_client(self, mock_kv_client): - return YuanrongStorageClient({"host": "127.0.0.1", "port": 31501}) + return KVClientAdapter({"host": "127.0.0.1", "port": 31501}) def test_mset_mget_p2p(self, storage_client, mocker): # Mock serialization/deserialization @@ -80,13 +80,13 @@ def side_effect_mcreate(keys, sizes): stored_raw_buffers.append(b.MutableData()) return buffers - storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate - storage_client._cpu_ds_client.get_buffers.return_value = stored_raw_buffers + storage_client._ds_client.mcreate.side_effect = side_effect_mcreate + storage_client._ds_client.get_buffers.return_value = stored_raw_buffers - storage_client.mset_zcopy( + storage_client.mset_zero_copy( ["tensor_key", "string_key"], [torch.tensor([1.0, 2.0, 3.0], dtype=torch.float32), "hello yuanrong"] ) - results = storage_client.mget_zcopy(["tensor_key", "string_key"]) + results = storage_client.mget_zero_copy(["tensor_key", "string_key"]) assert torch.allclose(results[0], torch.tensor([1.0, 2.0, 3.0], dtype=torch.float32)) assert results[1] == "hello yuanrong" diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 35bdd85..4910c14 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -1,46 +1,35 @@ -# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# Copyright 2025 The TransferQueue Team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +import sys +from unittest import mock import pytest import torch -from unittest import mock -from typing import Any, Optional -from transfer_queue.storage.clients.factory import StorageClientFactory +try: + import torch_npu # noqa: F401 +except ImportError: + pass + + +# --- Mock Backend Implementation --- -# --- Mock Backend Clients --- class MockDsTensorClient: def __init__(self, host, port, device_id): - self.host = host - self.port = port - self.device_id = device_id - self.storage = {} # key -> tensor + self.storage = {} def init(self): pass def dev_mset(self, keys, values): - for k, v in zip(keys, values): + for k, v in zip(keys, values, strict=True): assert v.device.type == "npu" - self.storage[k] = v.clone() # simulate store + self.storage[k] = v.clone() def dev_mget(self, keys, out_tensors): for i, k in enumerate(keys): if k in self.storage: out_tensors[i].copy_(self.storage[k]) + # Note: If key is missing, tensor remains unchanged (mock limitation) def dev_delete(self, keys): for k in keys: @@ -49,243 +38,151 @@ def dev_delete(self, keys): class MockKVClient: def __init__(self, host, port): - self.host = host - self.port = port - self.storage = {} # key -> bytes + self.storage = {} def init(self): pass def mcreate(self, keys, sizes): - # We don't actually need to return real buffers if we intercept packing - # But to keep interface, return dummy buffers class MockBuffer: def __init__(self, size): self._data = bytearray(size) - self.size = size + def MutableData(self): return memoryview(self._data) - self._current_keys = keys # HACK: remember keys for mset_buffer + + self._current_keys = keys return [MockBuffer(s) for s in sizes] def mset_buffer(self, buffers): - from transfer_queue.utils.serial_utils import _decoder, _encoder - # Reconstruct objects from buffers (simulate what pack_into did) - # But we can't easily unpack without knowing original obj... - # Instead, during test, we can assume that whatever was packed is recoverable. - # But for mock, let's just store the raw bytes of the first item in buffer - for key, buf in zip(self._current_keys, buffers): - # Extract the full content of the buffer - raw_bytes = bytes(buf.MutableData()) - self.storage[key] = raw_bytes - del self._current_keys + for key, buf in zip(self._current_keys, buffers, strict=True): + self.storage[key] = bytes(buf.MutableData()) def get_buffers(self, keys): - results = [] - for k in keys: - raw = self.storage.get(k) - if raw is None: - results.append(None) - else: - results.append(memoryview(raw)) - return results + return [memoryview(self.storage[k]) if k in self.storage else None for k in keys] def delete(self, keys): for k in keys: self.storage.pop(k, None) -# --- Patch utilities --- -def make_mock_datasystem(): - """Returns a mock module that replaces yr.datasystem""" - mock_ds = mock.MagicMock() - mock_ds.DsTensorClient = MockDsTensorClient - mock_ds.KVClient = MockKVClient - return mock_ds +# --- Fixtures --- -# --- Test Fixtures --- @pytest.fixture def mock_yr_datasystem(): - with mock.patch.dict("sys.modules", {"yr": mock.MagicMock(), "yr.datasystem": make_mock_datasystem()}): - import yr # noqa + """Wipe real 'yr' modules and inject mocks.""" + + # 1. Clean up sys.modules to force a fresh import under mock conditions + # This ensures top-level code in yuanrong_client.py is re-executed + to_delete = [k for k in sys.modules if k.startswith("yr")] + for mod in to_delete: + del sys.modules[mod] + + # 2. Setup Mock Objects + ds_mock = mock.MagicMock() + ds_mock.DsTensorClient = MockDsTensorClient + ds_mock.KVClient = MockKVClient + + yr_mock = mock.MagicMock(datasystem=ds_mock) + + # 3. Apply patches + # - sys.modules: Redirects 'import yr' to our mocks + # - YUANRONG_DATASYSTEM_IMPORTED: Forces the existence check to True so initialize the client successfully + # - datasystem: Direct attribute patch for the module + with ( + mock.patch.dict("sys.modules", {"yr": yr_mock, "yr.datasystem": ds_mock}), + mock.patch("transfer_queue.storage.clients.yuanrong_client.YUANRONG_DATASYSTEM_IMPORTED", True, create=True), + mock.patch("transfer_queue.storage.clients.yuanrong_client.datasystem", ds_mock), + ): yield @pytest.fixture def config(): - return { - "host": "127.0.0.1", - "port": 12345, - "enable_yr_npu_optimization": True, - } + return {"host": "127.0.0.1", "port": 12345, "enable_yr_npu_optimization": True} -# --- Helper: Check tensor equality with device awareness --- def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor): - assert a.shape == b.shape - assert a.dtype == b.dtype - if a.device.type == "npu" or b.device.type == "npu": - a_cpu = a.cpu() - b_cpu = b.cpu() - assert torch.equal(a_cpu, b_cpu) - else: - assert torch.equal(a, b) + assert a.shape == b.shape and a.dtype == b.dtype + # Move to CPU for cross-device comparison + assert torch.equal(a.cpu(), b.cpu()) -# --- Main Test Class --- -class TestYuanrongStorageE2E: +# --- Test Suite --- + +class TestYuanrongStorageE2E: @pytest.fixture(autouse=True) def setup_client(self, mock_yr_datasystem, config): - # Re-register YuanrongStorageClient in case it was loaded before - from transfer_queue.storage.clients.yuanrong import YuanrongStorageClient # adjust import path as needed + # Lazy import to ensure mocks are active + from transfer_queue.storage.clients.yuanrong_client import YuanrongStorageClient + self.client_cls = YuanrongStorageClient self.config = config - def _create_test_data_cpu(self): - """Pure CPU data: tensors + primitives""" - keys = ["cpu_tensor", "string", "int_val", "bool_val", "list_val"] - values = [ - torch.randn(3, 4), # CPU tensor - "hello world", - 42, - True, - [1, 2, {"a": 3}], - ] - shapes = [list(v.shape) if isinstance(v, torch.Tensor) else [] for v in values] - dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in values] - return keys, values, shapes, dtypes - - def _create_test_data_npu(self): - """Pure NPU tensors (only if NPU available)""" - if not hasattr(torch, 'npu') or not torch.npu.is_available(): - pytest.skip("NPU not available") - keys = ["npu_tensor1", "npu_tensor2"] - values = [ - torch.randn(2, 3).npu(), - torch.tensor([1, 2, 3], dtype=torch.int64).npu(), - ] - shapes = [list(v.shape) for v in values] - dtypes = [v.dtype for v in values] - return keys, values, shapes, dtypes - - def _create_test_data_mixed(self): - """Mixed NPU + CPU""" - if not hasattr(torch, 'npu') or not torch.npu.is_available(): - pytest.skip("NPU not available") - keys = ["npu_t", "cpu_t", "str_val"] - values = [ - torch.randn(1, 2).npu(), - torch.tensor([5.0]), # CPU - "mixed", - ] - shapes = [list(v.shape) if isinstance(v, torch.Tensor) else [] for v in values] - dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in values] - return keys, values, shapes, dtypes - - def test_put_get_clear_cpu_only(self, config): + def _create_data(self, mode="cpu"): + if mode == "cpu": + keys = ["t", "s", "i"] + vals = [torch.randn(2), "hi", 1] + elif mode == "npu": + if not (hasattr(torch, "npu") and torch.npu.is_available()): + pytest.skip("NPU required") + keys = ["n1", "n2"] + vals = [torch.randn(2).npu(), torch.tensor([1]).npu()] + else: # mixed + if not (hasattr(torch, "npu") and torch.npu.is_available()): + pytest.skip("NPU required") + keys = ["n1", "c1"] + vals = [torch.randn(2).npu(), "cpu"] + + shapes = [list(v.shape) if isinstance(v, torch.Tensor) else [] for v in vals] + dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in vals] + return keys, vals, shapes, dtypes + + def test_cpu_only_flow(self, config): client = self.client_cls(config) - keys, values, shapes, dtypes = self._create_test_data_cpu() - - # Put - custom_meta = client.put(keys, values) - assert len(custom_meta) == len(keys) - assert all(cm == "KVClient" for cm in custom_meta) - - # Get - retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - assert len(retrieved) == len(values) - for orig, ret in zip(values, retrieved): - if isinstance(orig, torch.Tensor): - assert_tensors_equal(orig, ret) - else: - assert orig == ret + keys, vals, shp, dt = self._create_data("cpu") - # Clear - client.clear(keys, custom_meta=custom_meta) + # Put & Verify Meta + meta = client.put(keys, vals) + assert all(m == "KVClient" for m in meta) - # Verify cleared (optional: try get again → should be None or error) - # Since our mock returns zeros for missing NPU keys but None for KV, - # and KV mock returns None for missing keys: - after_clear = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - assert all(v is None for v in after_clear) - # Note: For simplicity, we don't deeply verify deletion; focus on no crash. + # Get & Verify Values + ret = client.get(keys, shp, dt, meta) + for o, r in zip(vals, ret, strict=True): + if isinstance(o, torch.Tensor): + assert_tensors_equal(o, r) + else: + assert o == r - def test_put_get_clear_npu_only(self, config): - if not (hasattr(torch, 'npu') and torch.npu.is_available()): - pytest.skip("NPU not available") + # Clear & Verify + client.clear(keys, meta) + assert all(v is None for v in client.get(keys, shp, dt, meta)) - # Force enable NPU path - config["enable_yr_npu_optimization"] = True + def test_npu_only_flow(self, config): + keys, vals, shp, dt = self._create_data("npu") client = self.client_cls(config) - keys, values, shapes, dtypes = self._create_test_data_npu() - - # Put - custom_meta = client.put(keys, values) - assert all(cm == "DsTensorClient" for cm in custom_meta) + meta = client.put(keys, vals) + assert all(m == "DsTensorClient" for m in meta) - # Get - retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - for orig, ret in zip(values, retrieved): - assert_tensors_equal(orig, ret) + ret = client.get(keys, shp, dt, meta) + for o, r in zip(vals, ret, strict=True): + assert_tensors_equal(o, r) - # Clear - client.clear(keys, custom_meta=custom_meta) + client.clear(keys, meta) - def test_put_get_clear_mixed(self, config): - if not (hasattr(torch, 'npu') and torch.npu.is_available()): - pytest.skip("NPU not available") - - config["enable_yr_npu_optimization"] = True + def test_mixed_flow(self, config): + keys, vals, shp, dt = self._create_data("mixed") client = self.client_cls(config) - keys, values, shapes, dtypes = self._create_test_data_mixed() - - # Put - custom_meta = client.put(keys, values) - # Should have both strategies - assert "DsTensorClient" in custom_meta - assert "KVClient" in custom_meta + meta = client.put(keys, vals) + assert set(meta) == {"DsTensorClient", "KVClient"} - # Get - retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - for orig, ret in zip(values, retrieved): - if isinstance(orig, torch.Tensor): - assert_tensors_equal(orig, ret) + ret = client.get(keys, shp, dt, meta) + for o, r in zip(vals, ret, strict=True): + if isinstance(o, torch.Tensor): + assert_tensors_equal(o, r) else: - assert orig == ret - - # Clear - client.clear(keys, custom_meta=custom_meta) - - def test_without_npu_fallback_to_kv(self, config): - """Ensure NPU tensor is serialized via KVClient when NPU optimization is disabled.""" - if not (hasattr(torch, 'npu') and torch.npu.is_available()): - pytest.skip("NPU not available") - - config = config.copy() - config["enable_yr_npu_optimization"] = False - client = self.client_cls(config) - - keys = ["fallback_tensor"] - values = [torch.randn(2).npu()] # NPU tensor - - # Should go to KVClient - custom_meta = client.put(keys, values) - assert custom_meta == ["KVClient"] - - # Prepare metadata for get - shapes = [list(values[0].shape)] # e.g., [2] - dtypes = [values[0].dtype] # e.g., torch.float32 - - retrieved = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - # Should be deserialized as CPU tensor - assert retrieved[0].device.type == "cpu" - assert torch.equal(values[0].cpu(), retrieved[0]) - - # Clear and verify - client.clear(keys, custom_meta=custom_meta) - after_clear = client.get(keys, shapes=shapes, dtypes=dtypes, custom_meta=custom_meta) - assert after_clear[0] is None \ No newline at end of file + assert o == r From 2faa9eb472782c5d874d94a93f70a8258a3d8647 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 17:28:53 +0800 Subject: [PATCH 13/26] Added license to test_yuanrong_client Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 4910c14..ad15edb 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -1,3 +1,18 @@ +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import sys from unittest import mock From 7531714727c0383e82181f46c39c479e3d59d4bf Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 17:35:57 +0800 Subject: [PATCH 14/26] Added method 'test_mock_can_work' to test_yuanrong_client Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index ad15edb..479346b 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -155,6 +155,12 @@ def _create_data(self, mode="cpu"): dtypes = [v.dtype if isinstance(v, torch.Tensor) else None for v in vals] return keys, vals, shapes, dtypes + def test_mock_can_work(self, config): + mock_class = (MockDsTensorClient, MockKVClient) + client = self.client_cls(config) + for strategy in client._strategies: + assert isinstance(strategy._ds_client, mock_class) + def test_cpu_only_flow(self, config): client = self.client_cls(config) keys, vals, shp, dt = self._create_data("cpu") From a8293ef31b9c1b922dac17fca5e3d90603ec7231 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 17:37:12 +0800 Subject: [PATCH 15/26] Added an annotation to class 'StorageStrategy' Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/yuanrong_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 64261d1..ba2189d 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -41,6 +41,8 @@ class StorageStrategy(ABC): + """Abstract base class for storage strategies.""" + @staticmethod @abstractmethod def init(config: dict) -> Union["StorageStrategy", None]: From e120eb223f4a401323107b1d512298942246cc34 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Fri, 30 Jan 2026 17:42:09 +0800 Subject: [PATCH 16/26] Fixed test_yuanrong_client_zero_copy Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_client_zero_copy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_yuanrong_client_zero_copy.py b/tests/test_yuanrong_client_zero_copy.py index 25e5637..29b1791 100644 --- a/tests/test_yuanrong_client_zero_copy.py +++ b/tests/test_yuanrong_client_zero_copy.py @@ -45,7 +45,6 @@ def mock_kv_client(self, mocker): mocker.patch("yr.datasystem.KVClient", return_value=mock_client) mocker.patch("yr.datasystem.DsTensorClient") - mocker.patch("transfer_queue.storage.clients.yuanrong_client.TORCH_NPU_IMPORTED", False) return mock_client From c0fc5364aabdb493f1c38ca73633094626df945a Mon Sep 17 00:00:00 2001 From: dpj135 <60139850+dpj135@users.noreply.github.com> Date: Mon, 2 Feb 2026 11:06:26 +0800 Subject: [PATCH 17/26] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index ba2189d..3aed8e3 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -45,7 +45,7 @@ class StorageStrategy(ABC): @staticmethod @abstractmethod - def init(config: dict) -> Union["StorageStrategy", None]: + def init(config: dict) -> Optional["StorageStrategy"]: """Initialize strategy from config; return None if not applicable.""" @abstractmethod @@ -115,10 +115,8 @@ def supports_put(self, value: Any) -> bool: """Supports contiguous NPU tensors only.""" if not (isinstance(value, torch.Tensor) and value.device.type == "npu"): return False - # Todo(dpj): perhaps KVClient can process uncontiguous tensor - if not value.is_contiguous(): - raise ValueError(f"NPU Tensor is not contiguous: {value}") - return True + # Only contiguous NPU tensors are supported by this adapter. + return value.is_contiguous() def put(self, keys: list[str], values: list[Any]): """Store NPU tensors in batches; deletes before overwrite.""" @@ -150,10 +148,7 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: batch_values = self._create_empty_npu_tensorlist(batch_shapes, batch_dtypes) self._ds_client.dev_mget(batch_keys, batch_values) - # Todo(dpj): should we check failed keys? - # failed_keys = self._ds_client.dev_mget(batch_keys, batch_values) - # if failed_keys: - # logging.warning(f"YuanrongStorageClient: Querying keys using 'DsTensorClient' failed: {failed_keys}") + # Todo(dpj): consider checking and logging keys that fail during dev_mget results.extend(batch_values) return results @@ -212,7 +207,7 @@ def __init__(self, config: dict): logger.info("YuanrongStorageClient: Create KVClient to connect with yuanrong-datasystem backend!") @staticmethod - def init(config: dict) -> Union["StorageStrategy", None]: + def init(config: dict) -> Optional["StorageStrategy"]: """Always enabled for general objects.""" return KVClientAdapter(config) @@ -374,7 +369,7 @@ def __init__(self, config: dict[str, Any]): if not self._strategies: raise RuntimeError("No storage strategy available for YuanrongStorageClient") - def put(self, keys: list[str], values: list[Any]) -> list[str]: + def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: """Stores multiple key-value pairs to remote storage. Automatically routes NPU tensors to high-performance tensor storage, @@ -385,7 +380,7 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]: values (List[Any]): List of values to store (tensors, scalars, dicts, etc.). Returns: - List[str]: custom metadata of YuanrongStorageCilent in the same order as input keys. + List[str]: custom metadata of YuanrongStorageClient in the same order as input keys. """ if not isinstance(keys, list) or not isinstance(values, list): raise ValueError("keys and values must be lists") @@ -492,7 +487,10 @@ def _route_to_strategies( routed_indexes[strategy].append(i) break else: - raise ValueError(f"No strategy supports item: {item}") + raise ValueError( + f"No strategy supports item of type {type(item).__name__}: {item}. " + f"Available strategies: {[type(s).__name__ for s in self._strategies]}" + ) return routed_indexes @staticmethod @@ -520,7 +518,10 @@ def _dispatch_tasks(routed_tasks: dict[StorageStrategy, list[int]], task_functio return [task_function(*active_tasks[0])] # Parallel path: overlap NPU and CPU operations - with ThreadPoolExecutor(max_workers=len(active_tasks)) as executor: + # Cap the number of worker threads to avoid resource exhaustion if many + # strategies are added in the future. + max_workers = min(len(active_tasks), 4) + with ThreadPoolExecutor(max_workers=max_workers) as executor: # futures' results are from task_function futures = [executor.submit(task_function, strategy, indexes) for strategy, indexes in active_tasks] return [f.result() for f in futures] From c196940c7ea60c8d3d18abefe0796946d7ded839 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 2 Feb 2026 12:58:58 +0800 Subject: [PATCH 18/26] Renamed adapter classes & rename 'custom_meta()' to 'strategy_tag()' & adjusted annotation related to 'custom_meta()' Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/yuanrong_client.py | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 3aed8e3..11a49f6 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -49,8 +49,8 @@ def init(config: dict) -> Optional["StorageStrategy"]: """Initialize strategy from config; return None if not applicable.""" @abstractmethod - def custom_meta(self) -> Any: - """Return metadata identifying this strategy (e.g., string name).""" + def strategy_tag(self) -> Any: + """Return metadata identifying this strategy (e.g., string name, byte tag).""" @abstractmethod def supports_put(self, value: Any) -> bool: @@ -77,7 +77,7 @@ def clear(self, keys: list[str]): """Delete keys from storage.""" -class DsTensorClientAdapter(StorageStrategy): +class NPUTensorKVClientAdapter(StorageStrategy): """Adapter for YuanRong's high-performance NPU tensor storage.""" KEYS_LIMIT: int = 10_000 @@ -105,11 +105,11 @@ def init(config: dict) -> Union["StorageStrategy", None]: if not (enable and torch_npu_imported and torch.npu.is_available()): return None - return DsTensorClientAdapter(config) + return NPUTensorKVClientAdapter(config) - def custom_meta(self) -> Any: - """Metadata tag for NPU tensor storage.""" - return "DsTensorClient" + def strategy_tag(self) -> bytes: + """Strategy tag for NPU tensor storage. Using a single byte is for better performance.""" + return b"\x01" def supports_put(self, value: Any) -> bool: """Supports contiguous NPU tensors only.""" @@ -131,8 +131,8 @@ def put(self, keys: list[str], values: list[Any]): self._ds_client.dev_mset(batch_keys, batch_values) def supports_get(self, custom_meta: str) -> bool: - """Matches 'DsTensorClient' custom metadata.""" - return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + """Matches 'DsTensorClient' Strategy tag.""" + return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: """Fetch NPU tensors using pre-allocated empty buffers.""" @@ -153,8 +153,8 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: return results def supports_clear(self, custom_meta: str) -> bool: - """Matches 'DsTensorClient' metadata.""" - return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + """Matches 'DsTensorClient' strategy tag.""" + return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() def clear(self, keys: list[str]): """Delete NPU tensor keys in batches.""" @@ -180,7 +180,7 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): return tensors -class KVClientAdapter(StorageStrategy): +class GeneralKVClientAdapter(StorageStrategy): """ Adapter for general-purpose KV storage with serialization. The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'. @@ -209,11 +209,11 @@ def __init__(self, config: dict): @staticmethod def init(config: dict) -> Optional["StorageStrategy"]: """Always enabled for general objects.""" - return KVClientAdapter(config) + return GeneralKVClientAdapter(config) - def custom_meta(self) -> Any: - """Metadata tag for general KV storage.""" - return "KVClient" + def strategy_tag(self) -> bytes: + """Strategy tag for general KV storage. Using a single byte is for better performance.""" + return b"\x02" def supports_put(self, value: Any) -> bool: """Accepts any Python object.""" @@ -227,8 +227,8 @@ def put(self, keys: list[str], values: list[Any]): self.mset_zero_copy(batch_keys, batch_vals) def supports_get(self, custom_meta: str) -> bool: - """Matches 'KVClient' metadata.""" - return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + """Matches 'KVClient' strategy tag.""" + return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: """Retrieve and deserialize objects in batches.""" @@ -240,8 +240,8 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: return results def supports_clear(self, custom_meta: str) -> bool: - """Matches 'KVClient' metadata.""" - return isinstance(custom_meta, str) and custom_meta == self.custom_meta() + """Matches 'KVClient' strategy tag.""" + return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() def clear(self, keys: list[str]): """Delete keys in batches.""" @@ -249,8 +249,8 @@ def clear(self, keys: list[str]): batch_keys = keys[i : i + self.GET_CLEAR_KEYS_LIMIT] self._ds_client.delete(batch_keys) - @staticmethod - def calc_packed_size(items: list[memoryview]) -> int: + @classmethod + def calc_packed_size(cls, items: list[memoryview]) -> int: """ Calculate the total size (in bytes) required to pack a list of memoryview items into the structured binary format used by pack_into. @@ -261,12 +261,10 @@ def calc_packed_size(items: list[memoryview]) -> int: Returns: Total buffer size in bytes. """ - return ( - KVClientAdapter.HEADER_SIZE + len(items) * KVClientAdapter.ENTRY_SIZE + sum(item.nbytes for item in items) - ) + return cls.HEADER_SIZE + len(items) * cls.ENTRY_SIZE + sum(item.nbytes for item in items) - @staticmethod - def pack_into(target: memoryview, items: list[memoryview]): + @classmethod + def pack_into(cls, target: memoryview, items: list[memoryview]): """ Pack multiple contiguous buffers into a single buffer. ┌───────────────┐ @@ -285,22 +283,22 @@ def pack_into(target: memoryview, items: list[memoryview]): Each item must support the buffer protocol and be readable as raw bytes. """ - struct.pack_into(KVClientAdapter.HEADER_FMT, target, 0, len(items)) + struct.pack_into(cls.HEADER_FMT, target, 0, len(items)) - entry_offset = KVClientAdapter.HEADER_SIZE - payload_offset = KVClientAdapter.HEADER_SIZE + len(items) * KVClientAdapter.ENTRY_SIZE + entry_offset = cls.HEADER_SIZE + payload_offset = cls.HEADER_SIZE + len(items) * cls.ENTRY_SIZE target_tensor = torch.frombuffer(target, dtype=torch.uint8) for item in items: - struct.pack_into(KVClientAdapter.ENTRY_FMT, target, entry_offset, payload_offset, item.nbytes) + struct.pack_into(cls.ENTRY_FMT, target, entry_offset, payload_offset, item.nbytes) src_tensor = torch.frombuffer(item, dtype=torch.uint8) target_tensor[payload_offset : payload_offset + item.nbytes].copy_(src_tensor) - entry_offset += KVClientAdapter.ENTRY_SIZE + entry_offset += cls.ENTRY_SIZE payload_offset += item.nbytes - @staticmethod - def unpack_from(source: memoryview) -> list[memoryview]: + @classmethod + def unpack_from(cls, source: memoryview) -> list[memoryview]: """ Unpack multiple contiguous buffers from a single packed buffer. Args: @@ -309,12 +307,10 @@ def unpack_from(source: memoryview) -> list[memoryview]: list[memoryview]: List of unpacked contiguous buffers. """ mv = memoryview(source) - item_count = struct.unpack_from(KVClientAdapter.HEADER_FMT, mv, 0)[0] + item_count = struct.unpack_from(cls.HEADER_FMT, mv, 0)[0] offsets = [] for i in range(item_count): - offset, length = struct.unpack_from( - KVClientAdapter.ENTRY_FMT, mv, KVClientAdapter.HEADER_SIZE + i * KVClientAdapter.ENTRY_SIZE - ) + offset, length = struct.unpack_from(cls.ENTRY_FMT, mv, cls.HEADER_SIZE + i * cls.ENTRY_SIZE) offsets.append((offset, length)) return [mv[offset : offset + length] for offset, length in offsets] @@ -351,17 +347,21 @@ class YuanrongStorageClient(TransferQueueStorageKVClient): """ Storage client for YuanRong DataSystem. + Use different storage strategies depending on the data type. Supports storing and fetching both: - - NPU tensors via DsTensorClient (for high performance). - - General objects (CPU tensors, str, bool, list, etc.) via KVClient with serialization. + - NPU tensors via NPUTensorKVClientAdapter (for high performance). + - General objects (CPU tensors, str, bool, list, etc.) via GeneralKVClientAdapter with serialization. """ def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") + # Storage strategies are prioritized in ascending order of list element index. + # In other words, the later in the order, the lower the priority. + storage_strategies_priority = [NPUTensorKVClientAdapter, GeneralKVClientAdapter] self._strategies: list[StorageStrategy] = [] - for strategy_cls in [DsTensorClientAdapter, KVClientAdapter]: + for strategy_cls in storage_strategies_priority: strategy = strategy_cls.init(config) if strategy is not None: self._strategies.append(strategy) @@ -380,7 +380,7 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: values (List[Any]): List of values to store (tensors, scalars, dicts, etc.). Returns: - List[str]: custom metadata of YuanrongStorageClient in the same order as input keys. + List[str]: storage strategy tag of YuanrongStorageClient in the same order as input keys. """ if not isinstance(keys, list) or not isinstance(values, list): raise ValueError("keys and values must be lists") @@ -393,7 +393,7 @@ def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: # The closure captures local 'keys' and 'values' for zero-overhead parameter passing. def put_task(strategy, indexes): strategy.put([keys[i] for i in indexes], [values[i] for i in indexes]) - return strategy.custom_meta(), indexes + return strategy.strategy_tag(), indexes # Dispatch tasks and map metadata back to original positions custom_meta: list[str] = [""] * len(keys) From 50c52848be69640f9fa30b885b12dabc82f7546c Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 2 Feb 2026 13:57:24 +0800 Subject: [PATCH 19/26] Fixed 'KVClientAdapter' imported error Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_client_zero_copy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_yuanrong_client_zero_copy.py b/tests/test_yuanrong_client_zero_copy.py index 29b1791..3048ec5 100644 --- a/tests/test_yuanrong_client_zero_copy.py +++ b/tests/test_yuanrong_client_zero_copy.py @@ -25,7 +25,7 @@ sys.path.append(str(parent_dir)) from transfer_queue.storage.clients.yuanrong_client import ( # noqa: E402 - KVClientAdapter, + GeneralKVClientAdapter, ) @@ -50,7 +50,7 @@ def mock_kv_client(self, mocker): @pytest.fixture def storage_client(self, mock_kv_client): - return KVClientAdapter({"host": "127.0.0.1", "port": 31501}) + return GeneralKVClientAdapter({"host": "127.0.0.1", "port": 31501}) def test_mset_mget_p2p(self, storage_client, mocker): # Mock serialization/deserialization From a7303cf9469fe92f4052c9a5be6c264b03f58ca0 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 2 Feb 2026 14:21:12 +0800 Subject: [PATCH 20/26] Modified docstrings Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/yuanrong_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 11a49f6..a251773 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -18,7 +18,7 @@ import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable, Optional, TypeAlias, Union +from typing import Any, Callable, Optional, TypeAlias import torch from torch import Tensor @@ -94,7 +94,7 @@ def __init__(self, config: dict): logger.info("YuanrongStorageClient: Create DsTensorClient to connect with yuanrong-datasystem backend!") @staticmethod - def init(config: dict) -> Union["StorageStrategy", None]: + def init(config: dict) -> Optional["StorageStrategy"]: """Initialize only if NPU and torch_npu are available.""" torch_npu_imported: bool = True try: @@ -468,10 +468,12 @@ def _route_to_strategies( ) -> dict[StorageStrategy, list[int]]: """Groups item indices by the first strategy that supports them. - Used to route keys/values/custom_meta to storage backends by grouped indexes. + Used to route data to storage strategies by grouped indexes. Args: - items: A list of items (e.g., values for put, or custom_meta strings for get/clear). + items: A list used to distinguish which storage strategy the data is routed to. + e.g., route for put based on types of values, + or route for get/clear based on strategy_tag. The order must correspond to the original keys. selector: A function that determines whether a strategy supports an item. Signature: `(strategy: StorageStrategy, item: Any) -> bool`. From 1fd53f4d5e8ca8cfc3cd9e94b66678d84934a4f6 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 2 Feb 2026 14:29:26 +0800 Subject: [PATCH 21/26] Fixed 'test_yuanrong_storage_client_e2e.py' about strategy_tag Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 479346b..7250ebc 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -167,7 +167,8 @@ def test_cpu_only_flow(self, config): # Put & Verify Meta meta = client.put(keys, vals) - assert all(m == "KVClient" for m in meta) + # b"\x01" is a tag added by YuanrongStorageClient, indicating that it is processed via General KV path. + assert all(m == b"\x02" for m in meta) # Get & Verify Values ret = client.get(keys, shp, dt, meta) @@ -186,7 +187,8 @@ def test_npu_only_flow(self, config): client = self.client_cls(config) meta = client.put(keys, vals) - assert all(m == "DsTensorClient" for m in meta) + # b"\x01" is a tag added by YuanrongStorageClient, indicating that it is processed via NPU path. + assert all(m == b"\x01" for m in meta) ret = client.get(keys, shp, dt, meta) for o, r in zip(vals, ret, strict=True): @@ -199,7 +201,7 @@ def test_mixed_flow(self, config): client = self.client_cls(config) meta = client.put(keys, vals) - assert set(meta) == {"DsTensorClient", "KVClient"} + assert set(meta) == {b"\x01", b"\x02"} ret = client.get(keys, shp, dt, meta) for o, r in zip(vals, ret, strict=True): From 7bb83a8d41236ea04fd5153af89a34f998ad3c22 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Tue, 3 Feb 2026 10:15:55 +0800 Subject: [PATCH 22/26] Adjusted annotations of test_yuanrong_storage_client_e2e.py Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 7250ebc..71ecc94 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -26,7 +26,9 @@ # --- Mock Backend Implementation --- - +# In real scenarios, multiple DsTensorClients or KVClients share storage. +# Here, each mockClient is implemented with independent storage using a simple dictionary, +# and is only suitable for unit testing. class MockDsTensorClient: def __init__(self, host, port, device_id): @@ -38,13 +40,13 @@ def init(self): def dev_mset(self, keys, values): for k, v in zip(keys, values, strict=True): assert v.device.type == "npu" - self.storage[k] = v.clone() + self.storage[k] = v def dev_mget(self, keys, out_tensors): for i, k in enumerate(keys): + # Note: If key is missing, tensor remains unchanged (mock limitation) if k in self.storage: out_tensors[i].copy_(self.storage[k]) - # Note: If key is missing, tensor remains unchanged (mock limitation) def dev_delete(self, keys): for k in keys: From 9fac913e67a9e0de92bb8f8e732068135fa1b358 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Tue, 3 Feb 2026 10:47:24 +0800 Subject: [PATCH 23/26] Fixed reviews about yuanrong_client(modified strategy_tag, rename custom_name, adjusted annotations ...) Signed-off-by: dpj135 <958208521@qq.com> --- tests/test_yuanrong_storage_client_e2e.py | 11 ++-- .../storage/clients/yuanrong_client.py | 60 ++++++++++--------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 71ecc94..519335f 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -30,6 +30,7 @@ # Here, each mockClient is implemented with independent storage using a simple dictionary, # and is only suitable for unit testing. + class MockDsTensorClient: def __init__(self, host, port, device_id): self.storage = {} @@ -169,8 +170,8 @@ def test_cpu_only_flow(self, config): # Put & Verify Meta meta = client.put(keys, vals) - # b"\x01" is a tag added by YuanrongStorageClient, indicating that it is processed via General KV path. - assert all(m == b"\x02" for m in meta) + # "2" is a tag added by YuanrongStorageClient, indicating that it is processed via General KV path. + assert all(m == "2" for m in meta) # Get & Verify Values ret = client.get(keys, shp, dt, meta) @@ -189,8 +190,8 @@ def test_npu_only_flow(self, config): client = self.client_cls(config) meta = client.put(keys, vals) - # b"\x01" is a tag added by YuanrongStorageClient, indicating that it is processed via NPU path. - assert all(m == b"\x01" for m in meta) + # "1" is a tag added by YuanrongStorageClient, indicating that it is processed via NPU path. + assert all(m == "1" for m in meta) ret = client.get(keys, shp, dt, meta) for o, r in zip(vals, ret, strict=True): @@ -203,7 +204,7 @@ def test_mixed_flow(self, config): client = self.client_cls(config) meta = client.put(keys, vals) - assert set(meta) == {b"\x01", b"\x02"} + assert set(meta) == {"1", "2"} ret = client.get(keys, shp, dt, meta) for o, r in zip(vals, ret, strict=True): diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index a251773..5f5ce7e 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -61,15 +61,15 @@ def put(self, keys: list[str], values: list[Any]): """Store key-value pairs using this strategy.""" @abstractmethod - def supports_get(self, custom_meta: Any) -> bool: - """Check if this strategy can retrieve data with given metadata.""" + def supports_get(self, strategy_tag: Any) -> bool: + """Check if this strategy can retrieve data with given tag.""" @abstractmethod def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: """Retrieve values by keys; kwargs may include shapes/dtypes.""" @abstractmethod - def supports_clear(self, custom_meta: Any) -> bool: + def supports_clear(self, strategy_tag: Any) -> bool: """Check if this strategy owns data identified by metadata.""" @abstractmethod @@ -101,15 +101,15 @@ def init(config: dict) -> Optional["StorageStrategy"]: import torch_npu # noqa: F401 except ImportError: torch_npu_imported = False - enable = config.get("enable_yr_npu_optimization", True) + enable = config.get("enable_yr_npu_transport", True) if not (enable and torch_npu_imported and torch.npu.is_available()): return None return NPUTensorKVClientAdapter(config) - def strategy_tag(self) -> bytes: + def strategy_tag(self) -> str: """Strategy tag for NPU tensor storage. Using a single byte is for better performance.""" - return b"\x01" + return "1" def supports_put(self, value: Any) -> bool: """Supports contiguous NPU tensors only.""" @@ -130,15 +130,15 @@ def put(self, keys: list[str], values: list[Any]): pass self._ds_client.dev_mset(batch_keys, batch_values) - def supports_get(self, custom_meta: str) -> bool: + def supports_get(self, strategy_tag: str) -> bool: """Matches 'DsTensorClient' Strategy tag.""" - return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() + return isinstance(strategy_tag, str) and strategy_tag == self.strategy_tag() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: """Fetch NPU tensors using pre-allocated empty buffers.""" shapes = kwargs.get("shapes", None) dtypes = kwargs.get("dtypes", None) - if not shapes or not dtypes: + if shapes is None or dtypes is None: raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes") results = [] for i in range(0, len(keys), self.KEYS_LIMIT): @@ -152,9 +152,9 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: results.extend(batch_values) return results - def supports_clear(self, custom_meta: str) -> bool: + def supports_clear(self, strategy_tag: str) -> bool: """Matches 'DsTensorClient' strategy tag.""" - return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() + return isinstance(strategy_tag, str) and strategy_tag == self.strategy_tag() def clear(self, keys: list[str]): """Delete NPU tensor keys in batches.""" @@ -211,9 +211,9 @@ def init(config: dict) -> Optional["StorageStrategy"]: """Always enabled for general objects.""" return GeneralKVClientAdapter(config) - def strategy_tag(self) -> bytes: + def strategy_tag(self) -> str: """Strategy tag for general KV storage. Using a single byte is for better performance.""" - return b"\x02" + return "2" def supports_put(self, value: Any) -> bool: """Accepts any Python object.""" @@ -226,9 +226,9 @@ def put(self, keys: list[str], values: list[Any]): batch_vals = values[i : i + self.PUT_KEYS_LIMIT] self.mset_zero_copy(batch_keys, batch_vals) - def supports_get(self, custom_meta: str) -> bool: + def supports_get(self, strategy_tag: str) -> bool: """Matches 'KVClient' strategy tag.""" - return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() + return isinstance(strategy_tag, str) and strategy_tag == self.strategy_tag() def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: """Retrieve and deserialize objects in batches.""" @@ -239,9 +239,9 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: results.extend(objects) return results - def supports_clear(self, custom_meta: str) -> bool: + def supports_clear(self, strategy_tag: str) -> bool: """Matches 'KVClient' strategy tag.""" - return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag() + return isinstance(strategy_tag, str) and strategy_tag == self.strategy_tag() def clear(self, keys: list[str]): """Delete keys in batches.""" @@ -357,6 +357,8 @@ def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") + super().__init__(config) + # Storage strategies are prioritized in ascending order of list element index. # In other words, the later in the order, the lower the priority. storage_strategies_priority = [NPUTensorKVClientAdapter, GeneralKVClientAdapter] @@ -369,7 +371,7 @@ def __init__(self, config: dict[str, Any]): if not self._strategies: raise RuntimeError("No storage strategy available for YuanrongStorageClient") - def put(self, keys: list[str], values: list[Any]) -> Optional[list[Any]]: + def put(self, keys: list[str], values: list[Any]) -> list[str]: """Stores multiple key-value pairs to remote storage. Automatically routes NPU tensors to high-performance tensor storage, @@ -396,11 +398,11 @@ def put_task(strategy, indexes): return strategy.strategy_tag(), indexes # Dispatch tasks and map metadata back to original positions - custom_meta: list[str] = [""] * len(keys) - for meta_str, indexes in self._dispatch_tasks(routed_indexes, put_task): + strategy_tags: list[str] = [""] * len(keys) + for tag, indexes in self._dispatch_tasks(routed_indexes, put_task): for i in indexes: - custom_meta[i] = meta_str - return custom_meta + strategy_tags[i] = tag + return strategy_tags def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]: """Retrieves multiple values from remote storage with expected metadata. @@ -411,7 +413,7 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li keys (List[str]): Keys to fetch. shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. - custom_meta (List[str]): StorageStrategy type for each key + custom_meta (List[str]): StorageStrategy tag for each key Returns: List[Any]: Retrieved values in the same order as input keys. @@ -422,7 +424,10 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li if not (len(keys) == len(shapes) == len(dtypes) == len(custom_meta)): raise ValueError("Lengths of keys, shapes, dtypes, custom_meta must match") - routed_indexes = self._route_to_strategies(custom_meta, lambda strategy_, item_: strategy_.supports_get(item_)) + strategy_tags = custom_meta + routed_indexes = self._route_to_strategies( + strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_) + ) # Work unit for 'get': handles slicing of keys, shapes, and dtypes simultaneously. def get_task(strategy, indexes): @@ -443,7 +448,7 @@ def clear(self, keys: list[str], custom_meta=None): Args: keys (List[str]): List of keys to remove. - custom_meta (List[str]): StorageStrategy type for each key + custom_meta (List[str]): StorageStrategy tag for each key """ if not isinstance(keys, list) or not isinstance(custom_meta, list): raise ValueError("keys and custom_meta must be a list") @@ -451,8 +456,9 @@ def clear(self, keys: list[str], custom_meta=None): if len(custom_meta) != len(keys): raise ValueError("custom_meta length must match keys") + strategy_tags = custom_meta routed_indexes = self._route_to_strategies( - custom_meta, lambda strategy_, item_: strategy_.supports_clear(item_) + strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_) ) def clear_task(strategy, indexes): @@ -473,7 +479,7 @@ def _route_to_strategies( Args: items: A list used to distinguish which storage strategy the data is routed to. e.g., route for put based on types of values, - or route for get/clear based on strategy_tag. + or route for get/clear based on strategy_tags. The order must correspond to the original keys. selector: A function that determines whether a strategy supports an item. Signature: `(strategy: StorageStrategy, item: Any) -> bool`. From 20ae39be9d1c4505ce25ba542bb4cec3c660e9a0 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Tue, 3 Feb 2026 11:26:19 +0800 Subject: [PATCH 24/26] Rename custom_meta to custom_backend_meta Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/base.py | 2 +- .../storage/clients/mooncake_client.py | 28 ++++++++-------- .../storage/clients/ray_storage_client.py | 4 +-- .../storage/clients/yuanrong_client.py | 32 +++++++++---------- transfer_queue/storage/managers/base.py | 4 +-- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/transfer_queue/storage/clients/base.py b/transfer_queue/storage/clients/base.py index 435b7b3..a6d63f6 100644 --- a/transfer_queue/storage/clients/base.py +++ b/transfer_queue/storage/clients/base.py @@ -65,6 +65,6 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise NotImplementedError("Subclasses must implement get") @abstractmethod - def clear(self, keys: list[str], custom_meta: Optional[list[Any]] = None) -> None: + def clear(self, keys: list[str], custom_backend_meta=None) -> None: """Clear key-value pairs in the storage backend.""" raise NotImplementedError("Subclasses must implement clear") diff --git a/transfer_queue/storage/clients/mooncake_client.py b/transfer_queue/storage/clients/mooncake_client.py index 5966eab..8c5270b 100644 --- a/transfer_queue/storage/clients/mooncake_client.py +++ b/transfer_queue/storage/clients/mooncake_client.py @@ -135,18 +135,18 @@ def _batch_put_bytes(self, keys: list[str], values: list[bytes]): def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: """Get multiple key-value pairs from MooncakeStore. - Args: - keys (List[str]): Keys to fetch. - shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). - dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. -<<<<<<< HEAD - custom_backend_meta (List[str], optional): Device type (npu/cpu) for each key -======= - custom_meta (List[Any], optional): ... ->>>>>>> acd7686 (Added custom_meta to clear for all TransferQueueKVStorageClient) - - Returns: - List[Any]: Retrieved values in the same order as input keys. + Args: + keys (List[str]): Keys to fetch. + shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). + dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. + <<<<<<< HEAD + custom_backend_meta (List[str], optional): Device type (npu/cpu) for each key + ======= + custom_meta (List[Any], optional): ... + >>>>>>> acd7686 (Added custom_meta to clear for all TransferQueueKVStorageClient) + + Returns: + List[Any]: Retrieved values in the same order as input keys. """ if shapes is None or dtypes is None: @@ -220,12 +220,12 @@ def _batch_get_bytes(self, keys: list[str]) -> list[bytes]: results.extend(batch_results) return results - def clear(self, keys: list[str], custom_meta=None): + def clear(self, keys: list[str], custom_backend_meta=None): """Deletes multiple keys from MooncakeStore. Args: keys (List[str]): List of keys to remove. - custom_meta (List[Any], optional): ... + custom_backend_meta (List[Any], optional): ... """ for key in keys: ret = self._store.remove(key) diff --git a/transfer_queue/storage/clients/ray_storage_client.py b/transfer_queue/storage/clients/ray_storage_client.py index d515a99..c290f6f 100644 --- a/transfer_queue/storage/clients/ray_storage_client.py +++ b/transfer_queue/storage/clients/ray_storage_client.py @@ -106,11 +106,11 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non raise RuntimeError(f"Failed to retrieve value for key '{keys}': {e}") from e return values - def clear(self, keys: list[str], custom_meta=None): + def clear(self, keys: list[str], custom_backend_meta=None): """ Delete entries from storage by keys. Args: keys (list): List of keys to delete - custom_meta (List[Any], optional): ... + custom_backend_meta (List[Any], optional): ... """ ray.get(self.storage_actor.clear_obj_ref.remote(keys)) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 5f5ce7e..3e627d7 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -18,7 +18,7 @@ import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable, Optional, TypeAlias +from typing import Any, Callable, Optional import torch from torch import Tensor @@ -27,8 +27,6 @@ from transfer_queue.storage.clients.factory import StorageClientFactory from transfer_queue.utils.serial_utils import _decoder, _encoder -bytestr: TypeAlias = bytes | bytearray | memoryview - logger = logging.getLogger(__name__) logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) @@ -404,7 +402,7 @@ def put_task(strategy, indexes): strategy_tags[i] = tag return strategy_tags - def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]: + def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: """Retrieves multiple values from remote storage with expected metadata. Requires shape and dtype hints to reconstruct NPU tensors correctly. @@ -413,18 +411,18 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> li keys (List[str]): Keys to fetch. shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. - custom_meta (List[str]): StorageStrategy tag for each key + custom_backend_meta (List[str]): StorageStrategy tag for each key Returns: List[Any]: Retrieved values in the same order as input keys. """ - if shapes is None or dtypes is None or custom_meta is None: - raise ValueError("YuanrongStorageClient.get() needs Expected shapes, dtypes and custom_meta") + if shapes is None or dtypes is None or custom_backend_meta is None: + raise ValueError("YuanrongStorageClient.get() needs Expected shapes, dtypes and custom_backend_meta") - if not (len(keys) == len(shapes) == len(dtypes) == len(custom_meta)): - raise ValueError("Lengths of keys, shapes, dtypes, custom_meta must match") + if not (len(keys) == len(shapes) == len(dtypes) == len(custom_backend_meta)): + raise ValueError("Lengths of keys, shapes, dtypes, custom_backend_meta must match") - strategy_tags = custom_meta + strategy_tags = custom_backend_meta routed_indexes = self._route_to_strategies( strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_) ) @@ -443,20 +441,20 @@ def get_task(strategy, indexes): results[original_index] = value return results - def clear(self, keys: list[str], custom_meta=None): + def clear(self, keys: list[str], custom_backend_meta=None): """Deletes multiple keys from remote storage. Args: keys (List[str]): List of keys to remove. - custom_meta (List[str]): StorageStrategy tag for each key + custom_backend_meta (List[str]): StorageStrategy tag for each key """ - if not isinstance(keys, list) or not isinstance(custom_meta, list): - raise ValueError("keys and custom_meta must be a list") + if not isinstance(keys, list) or not isinstance(custom_backend_meta, list): + raise ValueError("keys and custom_backend_meta must be a list") - if len(custom_meta) != len(keys): - raise ValueError("custom_meta length must match keys") + if len(custom_backend_meta) != len(keys): + raise ValueError("custom_backend_meta length must match keys") - strategy_tags = custom_meta + strategy_tags = custom_backend_meta routed_indexes = self._route_to_strategies( strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_) ) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 9fb99ac..c07ec1f 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -634,5 +634,5 @@ async def clear_data(self, metadata: BatchMeta) -> None: logger.warning("Attempted to clear data, but metadata contains no fields.") return keys = self._generate_keys(metadata.field_names, metadata.global_indexes) - _, _, custom_meta = self._get_shape_type_custom_meta_list(metadata) - self.storage_client.clear(keys=keys, custom_meta=custom_meta) + _, _, custom_meta = self._get_shape_type_custom_backend_meta_list(metadata) + self.storage_client.clear(keys=keys, custom_backend_meta=custom_meta) From 8f3417c84bb7d49512d1c2b97a53f44b3907a71c Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Tue, 3 Feb 2026 15:12:56 +0800 Subject: [PATCH 25/26] Modified annotations about clients Signed-off-by: dpj135 <958208521@qq.com> --- .../storage/clients/mooncake_client.py | 20 ++++++++----------- .../storage/clients/yuanrong_client.py | 8 +++++--- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/transfer_queue/storage/clients/mooncake_client.py b/transfer_queue/storage/clients/mooncake_client.py index 8c5270b..4b4d9a3 100644 --- a/transfer_queue/storage/clients/mooncake_client.py +++ b/transfer_queue/storage/clients/mooncake_client.py @@ -135,18 +135,14 @@ def _batch_put_bytes(self, keys: list[str], values: list[bytes]): def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: """Get multiple key-value pairs from MooncakeStore. - Args: - keys (List[str]): Keys to fetch. - shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). - dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. - <<<<<<< HEAD - custom_backend_meta (List[str], optional): Device type (npu/cpu) for each key - ======= - custom_meta (List[Any], optional): ... - >>>>>>> acd7686 (Added custom_meta to clear for all TransferQueueKVStorageClient) - - Returns: - List[Any]: Retrieved values in the same order as input keys. + Args: + keys (List[str]): Keys to fetch. + shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). + dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. + custom_backend_meta (List[str], optional): ... + + Returns: + List[Any]: Retrieved values in the same order as input keys. """ if shapes is None or dtypes is None: diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 3e627d7..e4a26a9 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -76,7 +76,9 @@ def clear(self, keys: list[str]): class NPUTensorKVClientAdapter(StorageStrategy): - """Adapter for YuanRong's high-performance NPU tensor storage.""" + """Adapter for YuanRong's high-performance NPU tensor storage. + Using yr.datasystem.DsTensorClient to connect datasystem backends. + """ KEYS_LIMIT: int = 10_000 @@ -179,8 +181,8 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): class GeneralKVClientAdapter(StorageStrategy): - """ - Adapter for general-purpose KV storage with serialization. + """Adapter for general-purpose KV storage with serialization. + Using yr.datasystem.KVClient to connect datasystem backends. The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'. """ From 4a3be0f0bf34daf04600b8e04b78283530aad03b Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Tue, 3 Feb 2026 15:42:14 +0800 Subject: [PATCH 26/26] Adjusted expression of annotations and renamed one variable Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/storage/clients/yuanrong_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index e4a26a9..41219c2 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -391,17 +391,17 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]: routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_)) - # Define the work unit: Slicing the input list and calling the backend strategy. + # Define the 'put_task': Slicing the input list and calling the backend strategy. # The closure captures local 'keys' and 'values' for zero-overhead parameter passing. def put_task(strategy, indexes): strategy.put([keys[i] for i in indexes], [values[i] for i in indexes]) return strategy.strategy_tag(), indexes - # Dispatch tasks and map metadata back to original positions + # Dispatch tasks and map strategy_tag back to original positions strategy_tags: list[str] = [""] * len(keys) for tag, indexes in self._dispatch_tasks(routed_indexes, put_task): - for i in indexes: - strategy_tags[i] = tag + for original_index in indexes: + strategy_tags[original_index] = tag return strategy_tags def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=None) -> list[Any]: @@ -429,7 +429,7 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_) ) - # Work unit for 'get': handles slicing of keys, shapes, and dtypes simultaneously. + # Define the 'get_task': handles slicing of keys, shapes, and dtypes simultaneously. def get_task(strategy, indexes): res = strategy.get( [keys[i] for i in indexes], shapes=[shapes[i] for i in indexes], dtypes=[dtypes[i] for i in indexes]