-
Notifications
You must be signed in to change notification settings - Fork 1
[Roadmap] A general storage abstraction layer #72
Description
Background
In our previous implementation, we provided a naive TransferQueueStorageSimpleUnit class as the storage layer, which is tightly coupled with other components such as TransferQueueController and TransferQueueClient. This introduces extra code logic that may not be universal, making it difficult to support high-performance storage backends such as MoonCakeStore and Ray Direct Transport.
For instance,
- In
TransferQueueController: The controller manageslocal_indexesfor SimpleUnit, which is not universal information for all storage backends. - In
TransferQueueClient: The client directly manages the distributed SimpleUnit, making it difficult to adapt to other backends without major refactoring.
Solution
In PR66, we propose a general abstraction class TransferQueueStorageManager for storage backends. It defines the following interfaces:
@abstractmethod
async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None:
raise NotImplementedError("Subclasses must implement put_data")
@abstractmethod
async def get_data(self, metadata: BatchMeta) -> TensorDict:
raise NotImplementedError("Subclasses must implement get_data")
@abstractmethod
async def clear_data(self, metadata: BatchMeta) -> None:
raise NotImplementedError("Subclasses must implement clear_data")For customized storage backends, you only need to inherit from TransferQueueStorageManager, and implement these functions according to the API of your storage backends.
For general functions used in TransferQueue, such as _do_handshake_with_controller and notify_data_update, we also include them in this Manager class.
We will provide a KVStorageManager for general KV based storage as a reference implementation, where we can integrate different backends through simple configuration.
In this PR, we have rewritten the TransferQueueStorageSimpleUnit backend according to the above logic. We implement AsyncSimpleStorageManager to hide the complex logic of manipulating distributed storage units from the user interface. Now in TransferQueueClient, the core function for async_get_data looks like this:
# In class AsyncTransferQueueClient
async def async_get_data(self, metadata: BatchMeta) -> TensorDict:
if not metadata or metadata.size == 0:
return TensorDict({}, batch_size=0)
results = await self.storage_manager.get_data(metadata)
return resultsThe self.storage_manager is initialized by a factory:
# In class AsyncTransferQueueClient
def initialize_storage_manager(
self,
manager_type: str,
config: dict[str, Any],
):
self.storage_manager = TransferQueueStorageManagerFactory.create(manager_type, config)Progress
We have completed the initial refactoring and provided an E2E script in PR66, and it's now available in dev branch. You can try it by running the following commands:
python recipe/simple_use_case/async_demo.pyTODO
- Code optimization & docstrings @LLLLxmmm @jianjunzhong
- Provide a general dynamic socket function for both Client & Storage @ji-huazhong.
- Fix all UTs @LLLLxmmm @jianjunzhong @0oshowero0
- Provide sync_demo @LLLLxmmm @jianjunzhong (In following PR)
- Provide a
KVStorageManagerimplementation, where we can select different backend storages like Mooncake, Redis, etc. (PR#96) - Provide a general data system initialization utility function (In following PR)
- Better folder structure