Skip to content

[refactor] Refactor yuanrong_client#18

Merged
0oshowero0 merged 26 commits intoAscend:mainfrom
dpj135:refactor_yuanrongclient
Feb 3, 2026
Merged

[refactor] Refactor yuanrong_client#18
0oshowero0 merged 26 commits intoAscend:mainfrom
dpj135:refactor_yuanrongclient

Conversation

@dpj135
Copy link
Contributor

@dpj135 dpj135 commented Jan 28, 2026

Background:

  • Currently, the code structure of yuanrong_client.py is complex. The put/get operations of npu_ds_client and cpu_ds_client depend on different tool functions, global constants, and a large number of external dependencies.
  • In addition, yuanrong_client.py may be optimized in the future, or the interfaces of the data system may be updated or adjusted. As a result, the yuanrong_client.YuanrongStorageClient will be modified in a shotgun manner and changes will be divergent.

Description

Refactor using the Adapter and Strategy patterns.

image
  • The interface StorageStrategy is added, which provides a series of abstract methods for encapsulating the yr.datasystem interface.
  • The two storage paths of the original YuanrongStorageClient are extracted into two new adapter classes: DsTensorClientAdapter and KVClientAdapter.
  • YuanrongStorageClient is now responsible for dynamically routes data and schedules DsTensorClientAdapter and KVClientAdapter using the strategy pattern.

Todo:

  • Implement interface StorageStartegy.
  • Implement adapter class DsTensorClientAdapter and KVClientAdapter.
  • Add a parameter 'custom_meta' for All TransferQueueStorageClient's clear.
  • Implement parallelism of methods YuanrongStorageClient::put , YuanrongStorageClient::get and YuanrongStorageClient::clear.
  • Add a unit test.

@dpj135 dpj135 force-pushed the refactor_yuanrongclient branch from 1533adb to d3720fd Compare January 29, 2026 06:42
@dpj135 dpj135 marked this pull request as ready for review January 30, 2026 05:03
@dpj135 dpj135 force-pushed the refactor_yuanrongclient branch from 143251e to 4db6344 Compare January 30, 2026 06:16
@0oshowero0 0oshowero0 requested a review from Copilot January 30, 2026 10:34
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the yuanrong_client.py module using the Adapter and Strategy design patterns to improve code organization, maintainability, and enable parallel operations across different storage backends (NPU tensor storage and general KV storage).

Changes:

  • Introduced StorageStrategy abstract base class with two concrete implementations: DsTensorClientAdapter for high-performance NPU tensor storage and KVClientAdapter for general-purpose serialized object storage
  • Added custom_meta parameter to all storage client clear() methods to enable proper routing of delete operations based on storage backend type
  • Implemented parallel execution of put/get/clear operations when both NPU and CPU strategies are active, using ThreadPoolExecutor

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
transfer_queue/storage/clients/yuanrong_client.py Major refactor introducing Strategy pattern with adapters for DsTensorClient and KVClient; consolidated batching logic; added parallel dispatch mechanism
transfer_queue/storage/clients/base.py Updated get() and clear() method signatures to include custom_meta parameter with Optional type annotation
transfer_queue/storage/managers/base.py Modified clear_data() to extract and pass custom_meta to storage clients; added comment clarifying put operation
transfer_queue/storage/clients/ray_storage_client.py Updated clear() signature to accept custom_meta parameter (for interface compliance)
transfer_queue/storage/clients/mooncake_client.py Updated get() and clear() signatures to accept custom_meta parameter (for interface compliance)
tests/test_yuanrong_storage_client_e2e.py New comprehensive E2E test suite with mocked backends testing CPU-only, NPU-only, and mixed data flows
tests/test_yuanrong_client_zero_copy.py Updated to test KVClientAdapter directly instead of the full YuanrongStorageClient

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

def _batch_put(self, keys: list[str], values: list[Any]):
"""Stores a batch of key-value pairs to remote storage, splitting by device type.
@StorageClientFactory.register("YuanrongStorageClient")
class YuanrongStorageClient(TransferQueueStorageKVClient):
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not call TransferQueueStorageKVClient.init during initialization. (YuanrongStorageClient.init may be missing a call to a base class init)

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransferQueueStorgaeKVClient is an abstract class, it has no Actual data. So ignore it here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add this for future compatibility

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a python convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Args:
items: List of memoryview objects to be packed.
@abstractmethod
def custom_meta(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe strategy_tag?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm working on the refactor of custom_meta. After the refactor #21 , custom_meta refer to the sample-level information that explicitly provided by the users; while the custom_backend_meta refer to field-level metadata that automatically set by storage backends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for meta_str, indexes in self._dispatch_tasks(routed_indexes, put_task):
for i in indexes:
custom_meta[i] = meta_str
return custom_meta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorter tag string improves performance

in my opinion, custom_meta is a general concept from the perspective of metadata. here we can focus on what it really means in yuanrong client (strategy tag or device type, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"""Check if NPU client is available."""
return self._npu_ds_client is not None
torch_npu_imported = False
enable = config.get("enable_yr_npu_optimization", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this option is to allow users to disable ds tensor client even if in npu environment? which users will want this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's convenient for some users who dont want to configure a complex environment, and for developper to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend enable_yr_npu_transport

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


return DsTensorClientAdapter(config)

def custom_meta(self) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def mset_zcopy(self, keys: list[str], objs: list[Any]):

class KVClientAdapter(StorageStrategy):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From user side, it's a little bit hard to understand the difference between these two Adapters. Maybe GeneralKVClientAdapter and NPUTensorKVClientAdapter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for i, item in enumerate(items):
for strategy in self._strategies:
if selector(strategy, item):
routed_indexes[strategy].append(i)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that a single sample can have multiple routed_indexes? Need to make sure only one strategy is actually executed according to some priority setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'if ... break' proves one-to-one for sample_index and storage_strategy

"""
self._batch_clear(keys)
routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies}
for i, item in enumerate(items):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to prevent $O(m\times n)$ loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(_strategies) is 2 now. In fact, this loop is O(m)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(self._strategies) is a constant.

Used to route keys/values/custom_meta to storage backends by grouped indexes.

Args:
items: A list of items (e.g., values for put, or custom_meta strings for get/clear).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to explicitly distinguish the use case of custom_meta input and ordinary value input

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@dpj135 dpj135 force-pushed the refactor_yuanrongclient branch from 3901b02 to 2aa3955 Compare February 2, 2026 04:59
@dpj135
Copy link
Contributor Author

dpj135 commented Feb 2, 2026

@0oshowero0 @tianyi-ge Comments are addressed. Please take a look. Than you!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 19 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

results[idx] = obj
return results
# Dispatch tasks and map metadata back to original positions
custom_meta: list[str] = [""] * len(keys)
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation list[str] is inconsistent with the actual data being stored. The custom_meta list stores bytes values (from strategy.strategy_tag() which returns bytes), not strings. The type annotation should be list[bytes] to match the actual type being stored.

Copilot uses AI. Check for mistakes.
if not isinstance(keys, list) or not isinstance(values, list):
raise ValueError("keys and values must be lists")
if len(keys) != len(values):
raise ValueError("Number of keys must match number of values")
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The put method should validate that the keys list is not empty. Currently, it checks if keys and values are lists and if their lengths match, but doesn't check for empty lists. Calling put with empty lists would result in returning an empty custom_meta list, which may not be the intended behavior. Consider adding validation or documenting that empty lists are allowed.

Suggested change
raise ValueError("Number of keys must match number of values")
raise ValueError("Number of keys must match number of values")
if not keys:
raise ValueError("keys and values must be non-empty lists")

Copilot uses AI. Check for mistakes.
offset, length = struct.unpack_from(ENTRY_FMT, mv, HEADER_SIZE + i * ENTRY_SIZE)
offsets.append((offset, length))
return [mv[offset : offset + length] for offset, length in offsets]
KEYS_LIMIT: int = 10_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to datasystem doc, key>64 can lead to significant performance degradation. please test it for both cpu/npu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on current simple test, I find that the performance of DsTensorClient is minimally affected by KEYS_LIMIT, and KVClient is greatly affected by KEYS_LIMIT. From the results, it generally seems that the bigger, the better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that the performance degradation of KVClient(adapter class) may be due to thread overhead.

def mset_zero_copy(self, keys: list[str], objs: list[Any]):
        """Store multiple objects in zero-copy mode using parallel serialization and buffer packing.

        Args:
            keys (list[str]): List of string keys under which the objects will be stored.
            objs (list[Any]): List of Python objects to store (e.g., tensors, strings).
        """
        items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs]
        packed_sizes = [self.calc_packed_size(items) for items in items_list]
        buffers = self._ds_client.mcreate(keys, packed_sizes)
        tasks = [(target.MutableData(), item) for target, item in zip(buffers, items_list, strict=True)]
        with ThreadPoolExecutor(max_workers=self.DS_MAX_WORKERS) as executor:
            list(executor.map(lambda p: self.pack_into(*p), tasks))
        self._ds_client.mset_buffer(buffers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could consider first using multi-threading to execute pack_into, and then set them in batches. @Evelynn-V

"""Check if NPU client is available."""
return self._npu_ds_client is not None
torch_npu_imported = False
enable = config.get("enable_yr_npu_optimization", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend enable_yr_npu_transport


def supports_clear(self, custom_meta: str) -> bool:
"""Matches 'DsTensorClient' strategy tag."""
return isinstance(custom_meta, bytes) and custom_meta == self.strategy_tag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you need to compare strategy tag with custom meta, just use "0" and "1" instead of b"\x01" and b"\x02"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def _batch_put(self, keys: list[str], values: list[Any]):
"""Stores a batch of key-value pairs to remote storage, splitting by device type.
@StorageClientFactory.register("YuanrongStorageClient")
class YuanrongStorageClient(TransferQueueStorageKVClient):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a python convention

results[idx] = obj
return results
# Dispatch tasks and map metadata back to original positions
custom_meta: list[str] = [""] * len(keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#21 is renaming custom_meta to custom_backend_meta, and custom_meta has other meanings. I recommend to rename all the custom_meta here to strategy_tags so that 1. we focus on what it really means 2. other developers won't be confused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed most of the custom names. However, considering the responsibility of this PR is to refactor yuanrong_client, I did not change the function signatures of get and clear in YuanrongStorageClient to remain consistent with the abstract interface.

dpj135 added 10 commits February 3, 2026 11:07
…client.py'

Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
…he order of classes

Signed-off-by: dpj135 <958208521@qq.com>
…ong_client can execute correctly)

Signed-off-by: dpj135 <958208521@qq.com>
…geClient'

Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
dpj135 and others added 14 commits February 3, 2026 11:19
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: dpj135 <958208521@qq.com>
…& adjusted annotation related to 'custom_meta()'

Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
…tom_name, adjusted annotations ...)

Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
@dpj135 dpj135 force-pushed the refactor_yuanrongclient branch from ddcae5e to 20ae39b Compare February 3, 2026 03:26
@tianyi-ge
Copy link
Contributor

look good to merge

Signed-off-by: dpj135 <958208521@qq.com>
@dpj135 dpj135 force-pushed the refactor_yuanrongclient branch from b499c07 to 8f3417c Compare February 3, 2026 07:16
Signed-off-by: dpj135 <958208521@qq.com>
@0oshowero0 0oshowero0 merged commit fbdb58e into Ascend:main Feb 3, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants