Skip to content

[feat] Support async_reset_consumption to reuse data#25

Merged
0oshowero0 merged 1 commit intoAscend:mainfrom
rednote-ai:feat/rednote-ai/support_reset_consumption
Feb 3, 2026
Merged

[feat] Support async_reset_consumption to reuse data#25
0oshowero0 merged 1 commit intoAscend:mainfrom
rednote-ai:feat/rednote-ai/support_reset_consumption

Conversation

@Aurelius84
Copy link
Contributor

Title

Add reset_consumption Interface for Reusing Data in TransferQueue

Description

Overview

This PR adds the reset_consumption interface to TransferQueue, allowing users to reset the consumption status of data samples without clearing the actual data. This is particularly useful for debugging scenarios where the same rollout data needs to be processed multiple times without regenerating it.

Changes Made

1. Core Controller Implementation (transfer_queue/controller.py)

  • Added reset_consumption() method in DataPartitionStatus class to reset consumption status for specific tasks or all tasks
  • Added reset_consumption() method in TransferQueueController class to handle partition-level consumption reset
  • Added request handling for RESET_CONSUMPTION and RESET_CONSUMPTION_RESPONSE message types in the controller's request processing logic

2. Client API (transfer_queue/client.py)

  • Added async_reset_consumption() async method for resetting consumption status via async operations
  • Added reset_consumption() synchronous wrapper method for convenience
  • Both methods support resetting consumption for a specific task or all tasks in a partition

3. Protocol Extensions (transfer_queue/utils/zmq_utils.py)

  • Added RESET_CONSUMPTION and RESET_CONSUMPTION_RESPONSE request types to ZMQRequestType enum

4. Comprehensive Testing

In tests/test_client.py:

  • Added MockController support for RESET_CONSUMPTION requests
  • Added test_reset_consumption() - Tests synchronous reset with specific task name
  • Added test_reset_consumption_all_tasks() - Tests synchronous reset for all tasks
  • Added test_async_reset_consumption() - Tests async reset with specific task name
  • Added test_async_reset_consumption_all_tasks() - Tests async reset for all tasks

In tests/test_controller.py:

  • Added test_controller_reset_consumption() - Comprehensive integration test that verifies:
    • Consumption status before and after data consumption
    • Reset functionality for specific tasks
    • Reset functionality for all tasks (task_name=None)
    • Multi-task consumption scenarios

Usage Examples

Synchronous API:

from transfer_queue import TransferQueueClient

client = TransferQueueClient(client_id="client_0", controller_info=controller_info)

# Reset consumption for a specific task
success = client.reset_consumption(partition_id="train_0", task_name="generate_sequences")

# Reset consumption for all tasks in a partition
success = client.reset_consumption(partition_id="train_0")

Asynchronous API:

import asyncio

# Reset consumption for a specific task
success = await client.async_reset_consumption(partition_id="train_0", task_name="generate_sequences")

# Reset consumption for all tasks
success = await client.async_reset_consumption(partition_id="train_0")

Use Case Example:

# Scenario: Re-train on the same rollout data without regeneration

# Step 1: Fetch data and train
batch_meta = client.get_meta(
    data_fields=["prompts", "attention_mask"],
    batch_size=32,
    partition_id="train_0",
    mode="fetch",
    task_name="training"
)
batch_data = client.get_data(batch_meta)
# ... perform training ...

# Step 2: Reset consumption to allow re-training on same data
client.reset_consumption(partition_id="train_0", task_name="training")

# Step 3: Fetch and train again on the same data
batch_meta = client.get_meta(
    data_fields=["prompts", "attention_mask"],
    batch_size=32,
    partition_id="train_0",
    mode="fetch",
    task_name="training"
)
batch_data = client.get_data(batch_meta)
# ... perform training again ...

@Aurelius84 Aurelius84 changed the title [feat] Supprot async_reset_consumpation enable reuse same batch data [feat] Support async_reset_consumpation enable reuse same batch data Feb 3, 2026
@0oshowero0
Copy link
Collaborator

Please use git commit -s -m to pass DCO check and properly tracks your contribution~

@0oshowero0 0oshowero0 changed the title [feat] Support async_reset_consumpation enable reuse same batch data [feat] Support async_reset_consumption to reuse data Feb 3, 2026
@Aurelius84 Aurelius84 requested a review from 0oshowero0 February 3, 2026 13:02
Signed-off-by: yuetian <zhangliujie@xiaohongshu.com>
@Aurelius84 Aurelius84 force-pushed the feat/rednote-ai/support_reset_consumption branch from 1a27366 to 810ed3a Compare February 3, 2026 13:18
@0oshowero0 0oshowero0 merged commit c2bb0fa into Ascend:main Feb 3, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants