Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a92a942
Support storage unit in TransferQueue
FightingZhen Sep 23, 2025
bae27bb
Fix importance error
FightingZhen Sep 23, 2025
0c03e14
Support controller in TransferQueue (#2)
LLLLxmmm Sep 24, 2025
64de012
expose TransferQueueClient (#3)
ji-huazhong Sep 24, 2025
54e1889
Add copyright and license information
0oshowero0 Sep 24, 2025
b79c2ab
update client docstring (#5)
0oshowero0 Sep 25, 2025
d67890e
merge TransferQueue utils (#4)
zhabuye Sep 25, 2025
6a54445
[fix] Fix n_sample related problems (#8)
0oshowero0 Sep 25, 2025
fec6303
expose TransferQueue client/controller UT (#6)
zhabuye Sep 25, 2025
092d7c0
Add metadata.py and test_simple_storage_unit.py (#9)
jianjunzhong Sep 26, 2025
7bf97ed
Add reorder function to BatchMeta (#13)
LLLLxmmm Sep 28, 2025
64d49d4
Merge remote-tracking branch 'upstream/main' into main_tq_submodule
0oshowero0 Sep 28, 2025
6ec7ca9
Add TransferQueue as submodule under experimental/transfer_queue
0oshowero0 Sep 28, 2025
035d7a7
update requirements for transfer_queue
0oshowero0 Sep 28, 2025
5cfa5f6
update doc
0oshowero0 Sep 28, 2025
acbd595
[recipe] feat: Integrate TransferQueue into RayTrainer (#14)
LLLLxmmm Sep 29, 2025
02af787
fix chinese comments & add TODO (#15)
0oshowero0 Sep 29, 2025
fac98d0
update transferqueue submodule branch
0oshowero0 Sep 29, 2025
f0dbeb6
update transferqueue submodule to latest commit
0oshowero0 Sep 29, 2025
d0add3f
update transferqueue submodule to latest commit
0oshowero0 Sep 29, 2025
31814a3
simplify import
0oshowero0 Sep 29, 2025
0abcacf
fix
0oshowero0 Sep 29, 2025
be9b19c
[recipe]feat: register tq server info for each workgroup (#18)
ji-huazhong Sep 29, 2025
667bb3d
expose more API
0oshowero0 Sep 29, 2025
6eb1692
expose all API
0oshowero0 Sep 29, 2025
9d92f0a
expose all APIs
0oshowero0 Sep 29, 2025
ec91b86
expose all APIs
0oshowero0 Sep 29, 2025
7f89443
fix
0oshowero0 Sep 29, 2025
eb31070
[data] feat: Provide general decorator for DataProto <-> BatchMeta (#21)
0oshowero0 Sep 29, 2025
d802f0c
feat: Support conversion between dataproto and batchmeta (#24)
ji-huazhong Sep 30, 2025
68c28a7
remove unnecessary codes (#25)
0oshowero0 Sep 30, 2025
8af95bc
fix (#26)
baymax591 Oct 9, 2025
206fbc3
support transferqueue in agent loop (#27)
baymax591 Oct 10, 2025
39c83a3
feat: support grpo with transfer queue (#22)
jianjunzhong Oct 10, 2025
8f9773b
fix: correct batch size assignment in _dataproto_to_tensordict functi…
jianjunzhong Oct 11, 2025
af3189c
Merge pull request #31 from volcengine/main
0oshowero0 Oct 13, 2025
db0fb3e
feat: Support validating with TransferQueue (#29)
LLLLxmmm Oct 13, 2025
f2072f5
simplify the implementation of DataProto and BatchMeta conversion (#32)
ji-huazhong Oct 13, 2025
4efd166
fix: make batchmeta dataproto converter compatible with server mode (…
ji-huazhong Oct 14, 2025
136c82a
update TransferQueue submodule
0oshowero0 Oct 14, 2025
7db97f8
fix validate in agent loop (#34)
baymax591 Oct 14, 2025
7c927e5
replace submodule with pip dependencies (#35)
ji-huazhong Oct 14, 2025
d5603be
add TODOs for TransferQueue (#37)
LLLLxmmm Oct 15, 2025
340bd40
apply review suggestions (#38)
ji-huazhong Oct 15, 2025
50a1bbc
make ci happy (#39)
baymax591 Oct 16, 2025
6e3b949
more bugfix
ji-huazhong Oct 16, 2025
6462883
fix
baymax591 Oct 16, 2025
7a7a0c1
Merge pull request #40 from baymax591/main_tq_submodule
baymax591 Oct 16, 2025
9013472
Adapt for multimodal data format (#42)
LLLLxmmm Oct 17, 2025
a60ade4
Merge branch 'main' into main_tq_submodule
ji-huazhong Oct 18, 2025
77c9a0e
apply review suggestions
ji-huazhong Oct 18, 2025
20d0f98
simpify the implementation of main_ppo in recipe/transfer_queue
ji-huazhong Oct 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/data/transfer_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# TransferQueue Data System

Last updated: 09/28/2025.

This doc introduce [TransferQueue](https://github.com/TransferQueue/TransferQueue), an asynchronous streaming data management system for efficient post-training.


<h2 id="overview"> Overview</h2>

TransferQueue is a high-performance data storage and transfer system with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows.

<p align="center">
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696193102-a5654375-65a1-4e06-9c63-142b59df90b8.png" width="70%">
</p>


TransferQueue offers **fine-grained, sample-level** data management capabilities, serving as a data gateway that decouples explicit data dependencies across computational tasks. This enables a divide-and-conquer approach, significantly simplifying the design of the algorithm controller.


<p align="center">
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696791245-fa7baf96-46af-4c19-8606-28ffadc4556c.png" width="70%">
</p>




<h2 id="components"> Components</h2>



### Control Plane: Panoramic Data Management

In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. When all the required data fields are ready (i.e., written to the `TransferQueueStorage`), we know that this data sample can be consumed by downstream tasks.

For consumption status, we record the consumption records for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even different computation tasks require the same data field, they can consume the data independently without interfering with each other.


<p align="center">
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696820173-456c1784-42ba-40c8-a292-2ff1401f49c5.png" width="70%">
</p>


> In the future, we plan to support **load-balancing** and **dynamic batching** capabilities in the control plane. Besides, we will support data management for disaggregated frameworks where each rank manages the data retrieval by itself, rather than coordinated by a single controller.

### Data Plane: Distributed Data Storage

In the data plane, `TransferQueueStorageSimpleUnit` serves as a naive storage unit based on CPU memory, responsible for the actual storage and retrieval of data. Each storage unit can be deployed on a separate node, allowing for distributed data management.

`TransferQueueStorageSimpleUnit` employs a 2D data structure as follows:

- Each row corresponds to a training sample, assigned a unique index within the corresponding global batch.
- Each column represents the input/output data fields for computational tasks.

This data structure design is motivated by the computational characteristics of the post-training process, where each training sample is generated in a relayed manner across task pipelines. It provides an accurate addressing capability, which allows fine-grained, concurrent data read/write operations in a streaming manner.

<p align="center">
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696805154-3817011f-84e6-40d0-a80c-58b7e3e5f6a7.png" width="70%">
</p>


> In the future, we plan to implement a **general storage abstraction layer** to support various storage backends. Through this abstraction, we hope to integrate high-performance storage solutions such as [MoonCakeStore](https://github.com/kvcache-ai/Mooncake) to support device-to-device data transfer through RDMA, further enhancing data transfer efficiency for large-scale data.


### User Interface: Asynchronous & Synchronous Client


The interaction workflow of TransferQueue system is as follows:

1. A process sends a read request to the `TransferQueueController`.
2. `TransferQueueController` scans the production and consumption metadata for each sample (row), and dynamically assembles a micro-batch metadata according to the load-balancing policy. This mechanism enables sample-level data scheduling.
3. The process retrieves the actual data from distributed storage units using the metadata provided by the controller.

To simplify the usage of TransferQueue, we have encapsulated this process into `AsyncTransferQueueClient` and `TransferQueueClient`. These clients provide both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue to their framework.


> In the future, we will provide a `StreamingDataLoader` interface for disaggregated frameworks as discussed in [RFC#2662](https://github.com/volcengine/verl/discussions/2662). Leveraging this abstraction, each rank can automatically get its own data like `DataLoader` in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.


<h2 id="show-cases"> Show Cases</h2>

### General Usage

The primary interaction points are `AsyncTransferQueueClient` and `TransferQueueClient`, serving as the communication interface with the TransferQueue system.

Core interfaces:

- (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
- (async_)get_data(metadata:BatchMeta) -> TensorDict
- (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
- (async_)clear(global_step: int)


We will soon release a detailed tutorial and API documentation.


### verl Example


The primary motivation for integrating TransferQueue to verl now is to **alleviate the data transfer bottleneck of the single controller `RayPPOTrainer`**. Currently, all `DataProto` objects must be routed through `RayPPOTrainer`, resulting in a single point bottleneck of the whole post-training system.

![verl_dataflow_DataProto](https://cdn.nlark.com/yuque/0/2025/jpeg/23208217/1758704289414-bcc54228-716b-4d4a-ad3b-f9ace6d10fcf.jpeg)

Leveraging TransferQueue, we separate experience data transfer from metadata dispatch by

- Replacing `DataProto` with `BatchMeta` (metadata) and `TensorDict` (actual data) structures
- Preserving verl's original Dispatch/Collect logic via BatchMeta (maintaining single-controller debuggability)
- Accelerating data transfer by TransferQueue's distributed storage units

![verl_dataflow_TransferQueue](https://cdn.nlark.com/yuque/0/2025/jpeg/23208217/1758704301666-0807dc06-766c-4a2d-9cde-889a6bb56b34.jpeg)


You may refer to the [recipe](https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case), where we mimic the verl usage in both async & sync scenarios.





<h2 id="citation"> Citation</h2>
Please kindly cite our paper if you find this repo is useful:

```bibtex
@article{han2025asyncflow,
title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
journal={arXiv preprint arXiv:2507.01663},
year={2025}
}
```
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ verl is fast with:
advance/one_step_off
advance/agent_loop
advance/fully_async
data/transfer_queue.md

.. toctree::
:maxdepth: 1
Expand Down
76 changes: 76 additions & 0 deletions recipe/transfer_queue/agent_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2025 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import ray
from transfer_queue import BatchMeta

import verl.experimental.agent_loop.agent_loop as agent_loop
from verl import DataProto


class AgentLoopManager(agent_loop.AgentLoopManager):
def generate_sequences(self, prompts: BatchMeta) -> BatchMeta:
"""Split input batch and dispatch to agent loop workers.

Args:
prompts (BatchMeta): Input batch.

Returns:
BatchMeta: Output batch metadata.
"""

if self.rm_micro_batch_size and len(prompts) % self.rm_micro_batch_size != 0:
raise ValueError(
f"The length of prompts {len(prompts)} cannot divide the world size of rm_wg {self.rm_micro_batch_size}"
)
if self.config.actor_rollout_ref.rollout.free_cache_engine:
self.wake_up()
chunkes = prompts.chunk(len(self.agent_loop_workers))
outputs = ray.get(
[
worker.generate_sequences.remote(chunk)
for worker, chunk in zip(self.agent_loop_workers, chunkes, strict=True)
]
)
output = BatchMeta.concat(outputs)
if self.config.actor_rollout_ref.rollout.free_cache_engine:
self.sleep()

# calculate performance metrics
metrics = [output.extra_info.pop("metrics") for output in outputs] # List[List[Dict[str, str]]]
timing = self._performance_metrics(metrics, output)

output.set_extra_info("timing", timing)
return output

def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: DataProto) -> dict[str, float]:
timing = {}
t_generate_sequences = np.array([metric["generate_sequences"] for chunk in metrics for metric in chunk])
t_tool_calls = np.array([metric["tool_calls"] for chunk in metrics for metric in chunk])
timing["agent_loop/generate_sequences/min"] = t_generate_sequences.min()
timing["agent_loop/generate_sequences/max"] = t_generate_sequences.max()
timing["agent_loop/generate_sequences/mean"] = t_generate_sequences.mean()
timing["agent_loop/tool_calls/min"] = t_tool_calls.min()
timing["agent_loop/tool_calls/max"] = t_tool_calls.max()
timing["agent_loop/tool_calls/mean"] = t_tool_calls.mean()

return timing

def create_transferqueue_client(self, controller_infos, storage_infos, role):
ray.get(
[
worker.create_transferqueue_client.remote(controller_infos, storage_infos, role)
for worker in self.agent_loop_workers
]
)
11 changes: 11 additions & 0 deletions recipe/transfer_queue/config/transfer_queue_ppo_trainer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
hydra:
searchpath:
- file://verl/trainer/config

defaults:
- ppo_trainer
- _self_

# config for TransferQueue
transfer_queue:
enable: True
Loading
Loading