Skip to content

Conversation

0oshowero0
Copy link
Contributor

@0oshowero0 0oshowero0 commented Sep 30, 2025

What does this PR do?

This PR introduces the TransferQueue data management module to verl, aiming to accelerate experience data transfer and address performance bottlenecks in post-training systems. Detailed design rationale is available in our RFC (#2662).

This PR adds TransferQueue as a git submodule into verl/experimental/transfer_queue. Besides, we provide end-to-end scripts that integrate verl with TransferQueue.

TransferQueue is a high-performance data storage and transfer module with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows (in progress).

The system will introduce the following core components:

  • TransferQueueClient: Deployed on each Worker, manages the communication with TransferQueue system via simple put/get semantics.

  • TransferQueueController: Centralized dataflow scheduler tracking the production and consumption status of training samples.

  • TransferQueueStorage: Distributed storage units that holds the actual experience data.

The primary motivation for integrating TransferQueue to verl now is to alleviate the data transfer bottleneck of the single controller RayPPOTrainer. Currently, all DataProto objects must be routed through RayPPOTrainer, resulting in a single point bottleneck of the whole post-training system.

verl_dataflow_DataProto

Leveraging TransferQueue, we separate experience data transfer from metadata dispatch by

  • Replacing DataProto with BatchMeta (metadata) and TensorDict (actual data) structures
  • Preserving verl's original Dispatch/Collect logic via BatchMeta (maintaining single-controller debuggability)
  • Accelerating data transfer by TransferQueue's distributed storage units

verl_dataflow_TransferQueue

For WorkerGroup class, we hide the above translation process by decorator. For AgentLoop related class, we explicitely do the adaption in AgentLoopBase.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

We've validated TransferQueue functionality through

  • Unit test of (Async)TransferQueueClient, TransferQueueController, and TransferQueueSimpleUnit
  • End-to-end demo that mimics the usage in verl

API and Usage Example

The primary interaction points are AsyncTransferQueueClient and TransferQueueClient, serving as the communication interface with the TransferQueue system.

Core client interfaces:

  • (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
  • (async_)get_data(metadata:BatchMeta) -> TensorDict
  • (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
  • (async_)clear(global_step: int)

You may refer to the example here, where we mimics the verl usage in both async & sync scenarios:
https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case.


Please use pip install ".[transferqueue]" to install TransferQueue to verl.

Then you can try our recipe as follows, which is adapted from run_qwen3-8b.sh with async rollout mode enabled using sglang backend. For more recipes and the accuracy comparison, refer to [this doc].(https://www.yuque.com/haomingzi-lfse7/hlx5g0/bqm536cgc52kv2gk?singleDoc#)

# Tested successfully on the hiyouga/verl:ngc-th2.6.0-cu126-vllm0.8.4-flashinfer0.2.2-cxx11abi0 image.
# It outperforms the Qwen2 7B base model by two percentage points on the test set of GSM8K.

set -x

MODEL_PATH="/home/xxx/models/Qwen3-8B"

TRAIN_FILE="/home/xxx/data/DAPO-Math-17k/data/dapo-math-17k.parquet"
TEST_FILE="/home/xxx/data/DAPO-Math-17k/data/dapo-math-17k.parquet"

log_dir="./logs"
mkdir -p ${log_dir}
timestamp=$(date +"%Y%m%d%H%M%S")
log_file="${log_dir}/qwen3-8b_tq_${timestamp}.log"

rollout_mode="async"
rollout_name="sglang" # sglang or vllm
if [ "$rollout_mode" = "async" ]; then
    export VLLM_USE_V1=1
    return_raw_chat="True"
fi

python3 -m recipe.transfer_queue.main_ppo \
    --config-name='transfer_queue_ppo_trainer' \
    algorithm.adv_estimator=grpo \
    data.train_files=${TRAIN_FILE} \
    data.val_files=${TEST_FILE} \
    data.return_raw_chat=$return_raw_chat \
    data.train_batch_size=128 \
    data.max_prompt_length=2048 \
    data.max_response_length=8192 \
    data.filter_overlong_prompts_workers=128 \
    data.filter_overlong_prompts=True \
    data.truncation='error' \
    actor_rollout_ref.model.path=${MODEL_PATH} \
    actor_rollout_ref.actor.optim.lr=1e-6 \
    actor_rollout_ref.model.use_remove_padding=True \
    actor_rollout_ref.actor.ppo_mini_batch_size=32 \
    actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
    actor_rollout_ref.actor.use_kl_loss=True \
    actor_rollout_ref.actor.kl_loss_coef=0.001 \
    actor_rollout_ref.actor.kl_loss_type=low_var_kl \
    actor_rollout_ref.actor.entropy_coeff=0 \
    actor_rollout_ref.model.enable_gradient_checkpointing=True \
    actor_rollout_ref.actor.fsdp_config.param_offload=True \
    actor_rollout_ref.actor.fsdp_config.optimizer_offload=True \
    actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \
    actor_rollout_ref.rollout.tensor_model_parallel_size=4 \
    actor_rollout_ref.rollout.max_num_batched_tokens=10240 \
    actor_rollout_ref.rollout.name=$rollout_name \
    actor_rollout_ref.rollout.mode=$rollout_mode \
    actor_rollout_ref.rollout.gpu_memory_utilization=0.8 \
    actor_rollout_ref.rollout.n=5 \
    actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=8 \
    actor_rollout_ref.ref.fsdp_config.param_offload=True \
    algorithm.use_kl_in_reward=False \
    trainer.critic_warmup=0 \
    trainer.logger=console \
    trainer.project_name='verl_grpo_example_gsm8k' \
    trainer.experiment_name='qwen3_8b_function_rm' \
    trainer.n_gpus_per_node=8 \
    trainer.nnodes=1 \
    trainer.save_freq=-1 \
    trainer.test_freq=1000 \
    trainer.total_epochs=15 \
    trainer.total_training_steps=50 \
    trainer.val_before_train=False \
    +trainer.num_global_batch=1 \
    +trainer.num_data_storage_units=8 \
    +trainer.num_data_controllers=8 \
    2>&1 | tee "$log_file"
echo "Finished, log is saved in: $log_file"

Accuracy comparison:
image
image

Design & Code Changes

Refer to our Paper, RFC, and Zhihu post :)

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

FightingZhen and others added 29 commits September 23, 2025 19:56
* Support controller in TransferQueue

* Fix import

* Fix comments

---------

Co-authored-by: liuximeng <[email protected]>
Added copyright and licensing information to the controller.py file.
* update client docstring

Signed-off-by: 0oshowero0 <[email protected]>

* fix n_sample related problems

Signed-off-by: 0oshowero0 <[email protected]>

---------

Signed-off-by: 0oshowero0 <[email protected]>
* Add metadata.py and test_simple_storage_unit.py

* Add copyright and license information to test_simple_storage_unit.py

* Apply suggestion from @Copilot

Co-authored-by: Copilot <[email protected]>

---------

Co-authored-by: Han Zhenyu 韩振宇 <[email protected]>
Co-authored-by: Copilot <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
* Origin recipe

* Integrate TransferQueue with Ray Trainer

* Fix codecheck

* Fix codecheck

* Fix codecheck

* Fix codecheck

* Fix

* Fix codecheck

---------

Co-authored-by: liuximeng <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
* fix chinese comments & add TODO

* provide general DataProto<->BatchMeta decorator

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* optimize code

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

---------

Signed-off-by: 0oshowero0 <[email protected]>
@ji-huazhong ji-huazhong force-pushed the main_tq_submodule branch 4 times, most recently from ff8ea48 to 340bd40 Compare October 16, 2025 08:48
* make ci happy

* update tq commit

* fix
@ji-huazhong ji-huazhong force-pushed the main_tq_submodule branch 2 times, most recently from 6333531 to 0b916d5 Compare October 16, 2025 10:57
global _TRANSFER_QUEUE_CLIENT
global _VAL_TRANSFER_QUEUE_CLIENT
if "val" in client_id:
_VAL_TRANSFER_QUEUE_CLIENT = AsyncTransferQueueClient(client_id, controller_infos, storage_infos)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we have a separate client for val? One client should be used for both train and val.

@ji-huazhong ji-huazhong force-pushed the main_tq_submodule branch 7 times, most recently from 5ef317d to 8fcf939 Compare October 20, 2025 09:00
@wuxibin89 wuxibin89 merged commit ae4b5fe into volcengine:main Oct 21, 2025
74 of 76 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants