[feat] Provide user-defined custom_meta methods#21
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a user-defined metadata mechanism allowing users to attach arbitrary key-value pairs (custom_meta) to samples in BatchMeta. The implementation distinguishes between:
custom_meta: User-defined per-sample metadata_custom_backend_meta: Backend-specific per-sample per-field metadata (renamed fromfield_custom_metas)
Changes:
- Added
custom_metafield and related methods (set_custom_meta,update_custom_meta,get_all_custom_meta,clear_custom_meta) toBatchMeta - Renamed backend metadata fields from
_custom_meta/field_custom_metasto_custom_backend_meta/field_custom_backend_metathroughout the codebase - Added
set_custom_metaandget_custom_metamethods toDataPartitionStatusandTransferQueueController - Added
chunk_by_partitionmethod to split batches by partition_id - Added
SET_CUSTOM_METArequest type and client methods (async_set_custom_meta,set_custom_meta) - Implicit sync of custom_meta when calling
client.put()
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/utils/zmq_utils.py | Added SET_CUSTOM_META and SET_CUSTOM_META_RESPONSE request types |
| transfer_queue/metadata.py | Added custom_meta field and methods to BatchMeta, renamed _custom_backend_meta, added chunk_by_partition method, updated post_init to filter metadata |
| transfer_queue/controller.py | Renamed field_custom_metas to field_custom_backend_meta, added custom_meta storage and methods to DataPartitionStatus and controller, updated generate_batch_meta to include both metadata types |
| transfer_queue/client.py | Added async_set_custom_meta and set_custom_meta methods, implicit sync in async_put, updated documentation for force_fetch mode |
| tests/test_metadata.py | Added comprehensive tests for custom_meta methods, reorganized test structure |
| tests/test_controller_data_partitions.py | Updated tests to use new field_custom_backend_meta naming, added tests for custom_meta methods |
| tests/test_controller.py | Added integration tests for custom_meta and custom_backend_meta functionality |
| tests/test_client.py | Added tests for set_custom_meta client interface methods |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d955b85 to
7405c1b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
transfer_queue/metadata.py
Outdated
| for partition_id, global_index in zip(self.partition_ids, self.global_indexes, strict=False): | ||
| grouped_global_indexes[partition_id].append(global_index) | ||
|
|
||
| chunk_list = [self.select_samples(samples) for samples in grouped_global_indexes.values()] |
There was a problem hiding this comment.
The chunk_by_partition method uses select_samples which does not preserve custom_meta and _custom_backend_meta. When samples are selected, the custom metadata is lost. The implementation should pass custom_meta and _custom_backend_meta when creating new BatchMeta instances.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def __getitem__(self, item): | ||
| if isinstance(item, int | np.integer): | ||
| sample_meta = self.samples[item] if self.samples else [] | ||
| return BatchMeta(samples=[sample_meta], extra_info=self.extra_info) | ||
| global_idx = self.global_indexes[item] | ||
| return BatchMeta( | ||
| samples=[sample_meta], | ||
| extra_info=self.extra_info, | ||
| custom_meta={global_idx: self.custom_meta[global_idx]}, | ||
| _custom_backend_meta={global_idx: self._custom_backend_meta[global_idx]}, | ||
| ) |
There was a problem hiding this comment.
Missing test coverage for the __getitem__ method with custom_meta and _custom_backend_meta. Since this method was modified to handle these new fields, tests should verify that indexing works correctly when these fields are present or absent.
|
|
||
| # construct new SampleMeta instance | ||
| # TODO(tianyi): move custom_meta to FieldMeta level | ||
| # TODO(tianyi): (maybe) move _custom_backend_meta and custom_meta to FieldMeta level? |
There was a problem hiding this comment.
If _custom_backend_meta is in FieldMeta, some details need to be modified. For example, if users call BatchMeta.add_fields after put, it _extract_field_metas and then add_fields -> _union_fields, which overwrites the _custom_backend_meta and cannot be recovered
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
We should support put data with metadata? tq.put(data={"prompt_ids": torch.Tensor(), "response_ids": torch.Tensor(), ...}, metadata={"uid": "xxx", "session_id": "xxx", "status": "RUNNING"}) |
| _custom_meta: dict[int, dict[str, Any]] = dataclasses.field(default_factory=dict) | ||
|
|
||
| # user-defined meta for each sample | ||
| custom_meta: dict[int, dict[str, Any]] = dataclasses.field(default_factory=dict) |
There was a problem hiding this comment.
Better move custom_meta into SampleMeta?
class SampleMeta:
"""Records the metadata of a single data sample (stored as a row in the data system)."""
partition_id: str # Partition id, used for data versioning
uid: str # 原始prompt的uid
session_id: int = 0 # 单个prompt n多次采样,每个采样对应一个AgentLoop
trajectory_id: int = 0 # 每次AgentLoop可能有多个输出:prefix切换
extra_metadata: dict[str, Any] = dataclasses.field(default_factory=dict)
# global_index: int # Global row index, uniquely identifies a data sample
fields: dict[str, FieldMeta] # Fields of interest for this sampleThere was a problem hiding this comment.
Great point. Actually this is planned in following PR. https://github.com/Ascend/TransferQueue/pull/21/changes#diff-a1fde8d325ec343f7422450e654b20e60cdd34497c772d2a8d76d0874e3cf2edR140
To keep SampleMeta universal and avoid hardcoding specific fields, we are introducing a generic dictionary to store these information.
That's right. Now this can be done by the following usage example: custom_meta = {
global_index: {"uid": uuid.uuid4().hex[:4], "session_id": uuid.uuid4().hex[:4], "model_version": 0}
for global_index in batch_meta.global_indexes
}
batch_meta.update_custom_meta(custom_meta)
tq_client.put(data, batch_meta) |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> # Conflicts: # transfer_queue/client.py
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
07ed406 to
c2d75ce
Compare
Background
When integrating with upstream RL frameworks, it is often necessary to maintain some sample-level metadata alongside the batch data for computation. Typical examples include:
...
This PR introduces a flexible User-Defined Metadata mechanism. Users can now attach arbitrary key-value pairs (
custom_meta) toBatchMetaand synchronize them with theTransferQueueController.Key Changes
BatchMetaEnhancement: Added interfaces toBatchMetafor setting and retrieving user-definedcustom_meta.custom_meta:- Explicit Sync: Metadata can be sent to the controller via
TransferQueueClient.set_custom_meta.- Implicit Sync: Metadata is automatically stored when calling
TransferQueueClient.putwith given batch_meta input.custom_meta: Users can query the storedcustom_metafrom the controller usingget_metawithmode="force_fetch".We refactor the previous meta for storage backend as
_custom_backend_meta, which is a field-level information.Related API
BatchMeta
set_custom_meta(global_index:int, value:Any)Insert or update a single key-value pair for a specific index.
update_custom_meta(new_meta:dict[int, Any])Batch update
custom_metausing a dictionary.get_all_custom_meta() -> dict[int, Any]Retrieve all stored
custom_meta.TransferQueueClient
(async_)set_custom_meta(metadata: BatchMeta)Explicitly send
custom_metato theTransferQueueController(async_)put(data:TensorDict, metadata: BatchMeta)Implicitly send
custom_metato the controller while putting data.(async_)get_meta(partition_id:str, mode="force_fetch")Query and retrieve all
BatchMetaalone withcustom_metastored in the controller (requiresmode="force_fetch").Usage Example
Refer to
tutorial/02_metadata_concepts.py.Other Changes
tutorialscripts.TransferQueueClientTODO
custom_meta->SampleMeta,_custom_backend_meta->FieldMetaCC: @wuxibin89 @tianyi-ge