Skip to content

[P/D] Asynchronously do _nixl_handshake #19836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

lk-chen
Copy link
Collaborator

@lk-chen lk-chen commented Jun 19, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

Split from #19447, this PR keeps using zmq for nixl metadata transfer, but does _nixl_handshake in background.

see issue #19777

Test Plan

Unit test

Test Result

(Optional) Documentation Update

Co-authored-by: Will Eaton [email protected]
Co-authored-by: Nick Hill [email protected]

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @lk-chen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the NIXL KV transfer mechanism by introducing asynchronous handling for the _nixl_handshake operation. By offloading this potentially blocking task to a background thread pool, the change aims to enhance the overall responsiveness and efficiency of KV cache transfers, particularly when new connections to remote engines are required. It also includes improvements to type safety and resource management.

Highlights

  • Asynchronous NIXL Handshake: The _nixl_handshake operation, which can be time-consuming, is now executed asynchronously in a ThreadPoolExecutor. This prevents it from blocking the main thread, improving responsiveness during KV cache transfers, especially when establishing new connections with remote engines.
  • Request Queuing and Processing: Requests that require a handshake with a remote engine are now queued (_pending_requests) and only processed after the handshake successfully completes. A callback mechanism moves these requests to a 'ready' queue (_ready_requests) for subsequent processing, ensuring operations proceed only when connections are established.
  • Improved Type Safety and Readability: New type aliases, EngineId and ReqId, have been introduced and applied consistently across relevant data structures and method signatures within nixl_connector.py. This enhances code clarity and maintainability.
  • Robust Transfer Completion Logic: The logic for _pop_done_transfers has been refined to ensure that all individual transfer handles associated with a request are marked as 'DONE' before the entire request is considered complete and removed from tracking. This prevents premature cleanup of in-progress transfers.
  • Resource Management: A __del__ method has been added to NixlConnectorWorker to ensure the graceful shutdown of the ThreadPoolExecutor and proper joining of the handshake listener thread when the object is destroyed, preventing resource leaks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces asynchronous NIXL handshaking, which is a positive change for performance. The core mechanism involves a ThreadPoolExecutor (with max_workers=1), state tracking for handshakes, and a callback system to process requests post-handshake.

Key areas for consideration and potential improvement include:

  • The implications of max_workers=1 for the handshake executor.
  • The clarity of logging messages, particularly around error handling in handshake completion.
  • Ensuring the robustness of the cleanup mechanism in __del__.

The change to _pop_done_transfers significantly improves correctness for multi-part transfers.

Given this is a WIP and unit tests are pending, thorough testing of concurrency, error scenarios, and resource management will be critical.

Comment on lines 139 to 148
def bind_connector_metadata(
self, connector_metadata: KVConnectorMetadata) -> None:
super().bind_connector_metadata(connector_metadata)
# start_load_kv() doesn't use the forward_context, so we can pass a dummy one.
dummy_ctx = ForwardContext(
no_compile_layers={},
attn_metadata={},
virtual_engine=0,
)
self.start_load_kv(dummy_ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The bind_connector_metadata method now calls self.start_load_kv(dummy_ctx). The comment explains that start_load_kv() doesn't use the forward_context. This is a crucial assumption that could be fragile if NixlConnector.start_load_kv's implementation changes in the future to depend on forward_context.

Consider adding an assertion within NixlConnector.start_load_kv or a more prominent note about this dependency if this method is specifically intended to be callable without a fully-formed ForwardContext in certain scenarios like this one.

Comment on lines 404 to 405
self._executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="vllm-nixl-handshake")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using ThreadPoolExecutor(max_workers=1) serializes all NIXL handshake operations. If multiple new remote engines need to be handshaked with simultaneously, this could become a bottleneck. If concurrent handshakes are feasible and beneficial for performance in scenarios with many new engine connections, consider increasing max_workers and ensuring thread-safety within _nixl_handshake and its interactions with NIXL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with gemini we should confirm that NIXL is threadsafe and then increase this (to e.g. 8?) so that more than one handshake can happen in parallel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIXL cannot confirm this, and Robert mentioned likely it's not thread-safe.

Also in this test, the max_workers is not really working? I have 100 remote engined added, and timeout to be 2.5s, I expect test should not pass (if 100 engines added in sequece), but it passed?

so in this commit I removed ThreadPoolExecutor and instead manage the task queue myself. Now the test works as I expected (exception if timeout too short, unless I make timeout linear to number of remote agents)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't the handshakes complete instantly though since you're returning "DONE" from the dummy nixl?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latency gap this PR tries to resolve is from _nixl_handshake, where Robert found prep_xfer_dlist could take significant time.

And since it's not confirmed thread-safe to do handshake in parallel, I changed to use one thread instead of thread pool to do handshake for reach remote engine.

Comment on lines 439 to 433
def __del__(self):
"""Cleanup background threads on destruction."""
self._executor.shutdown(wait=False)
if self._nixl_handshake_listener_t:
self._nixl_handshake_listener_t.join(timeout=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The __del__ method provides best-effort cleanup for the executor and the listener thread. Ensure that _nixl_handshake and the listener thread's loop are designed to handle potential interruptions gracefully (e.g., releasing ZMQ sockets in finally blocks if applicable).

Comment on lines 508 to 513
failed_reqs = self._pending_requests.pop(engine_id, None)
if failed_reqs is not None:
logger.warning(
"Handshake failed for engine %s, leaving"
"%d requests pending for scheduler retry",
engine_id, len(failed_reqs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message might be slightly misleading. Since failed_reqs = self._pending_requests.pop(engine_id, None) is called just before this log, the requests are removed from self._pending_requests. They are not "left pending" in this worker's internal queue anymore.

Consider rephrasing for clarity, e.g., "Handshake failed for engine %s. %d associated requests were removed from the pending queue and will require scheduler retry."

                    if failed_reqs is not None:
                        logger.warning(
                            "Handshake failed for engine %s. %d associated requests were removed "
                            "from the pending queue and will require scheduler retry.",
                            engine_id, len(failed_reqs))

@lk-chen lk-chen changed the title WIP [P/D] Asynchronously do _nixl_handshake [P/D] Asynchronously do _nixl_handshake Jun 19, 2025
@mergify mergify bot added the v1 label Jun 19, 2025
@lk-chen lk-chen added the ready ONLY add when PR is ready to merge/full CI is needed label Jun 19, 2025
Copy link
Member

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lk-chen, and especially for adding the test/mocks!

Comment on lines 404 to 405
self._executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="vllm-nixl-handshake")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with gemini we should confirm that NIXL is threadsafe and then increase this (to e.g. 8?) so that more than one handshake can happen in parallel.

def _nixl_handshake_in_sequence():
while True:
req_id, meta = self._pending_hanshake.get()
self._nixl_handshake(meta.remote_host, meta.remote_port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work quite right in the case there are concurrent requests with the same remote engine id.

We can add a check to no-op the handshake if it's already done, but this will still have the downside that requests could be held up behind unrelated handshakes when their own engine's handshake is already complete.

FWIW I made an update to my version, I realized the locking wasn't quite right: main...njhill:vllm:async_nixl_handshake

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no-op the handshake if it's already done

I feel that may introduce more bugs: the base_addr might be updated when query come and finish, it's safer to always update metadata for each upcoming query. And even we want to optimize, it's worth another separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow this. There's a single handshake done per P/D pair, which includes registering the remote agent metadata with NIXL. I don't think we want to do it twice.

The approach implemented here with futures is intended to handle this properly (concurrent requests with the same remote engine will all block on a single handshake, and will all proceed as soon as that's done, even if other handshakes are started in-between / interleaved). Is there any reason to not use that (and just setting max_workers=1 if nixl threadsafety is a concern).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I've adopted your change.

@lk-chen lk-chen force-pushed the pd_parallel_handshake_slim branch from a417c27 to 8dbeb5b Compare June 23, 2025 02:34
Copy link

mergify bot commented Jun 23, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @lk-chen.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 23, 2025
@lk-chen lk-chen force-pushed the pd_parallel_handshake_slim branch from 8dbeb5b to 578d529 Compare June 23, 2025 06:00
@mergify mergify bot removed the needs-rebase label Jun 23, 2025
@lk-chen lk-chen requested a review from njhill June 23, 2025 17:57
Copy link

mergify bot commented Jun 23, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @lk-chen.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 23, 2025
lk-chen added 8 commits June 23, 2025 13:03
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
@lk-chen lk-chen force-pushed the pd_parallel_handshake_slim branch from 08aed64 to 32b895e Compare June 23, 2025 20:03
@mergify mergify bot removed the needs-rebase label Jun 23, 2025
lk-chen and others added 3 commits June 23, 2025 14:18
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Copy link
Member

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lk-chen LGTM

@lk-chen lk-chen added the ready-for-merge Indicate this PR is ready to be merged by the maintainers, used by reviewers without merge access. label Jun 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready ONLY add when PR is ready to merge/full CI is needed ready-for-merge Indicate this PR is ready to be merged by the maintainers, used by reviewers without merge access. v1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants