Skip to content

[BugFix] Fix multi-node offline data parallel #19937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

njhill
Copy link
Member

@njhill njhill commented Jun 21, 2025

The offline data parallel example examples/data_parallel.py was broken because there's no outside synchronization tied to the engines running state, so other processes would fail once the first one finishes its requests and exits.

This PR adds logic for the engines running state to be propagated to the front-end process in the offline case and taken into account by the LLMEngine.has_unfinished_requests() method.

Fixes #17241

The offline data parallel example examples/data_parallel.py was broken because there's no outside synchronization tied to the engines running state, so other processes would fail once the first one finishes its requests and exits.

This PR adds logic for the engines running state to be propagated to the front-end process in the offline case and taken into account by the LLMEngine.has_unfinished_requests() method.

Signed-off-by: Nick Hill <[email protected]>
@njhill njhill added the bug Something isn't working label Jun 21, 2025
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @njhill, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical bug in the multi-node offline data parallel execution, where processes would prematurely terminate due to a lack of proper synchronization of the engine's running state. The changes ensure that the LLMEngine accurately reflects whether the underlying engines are still active, preventing early exits and improving the robustness of distributed offline inference.

Highlights

  • Bug Fix for Offline Data Parallel: Implemented a mechanism to correctly propagate the engines_running state from the core engine to the frontend LLMEngine in multi-node offline data parallel setups. This prevents premature termination of processes when one rank finishes its requests before others.
  • Enhanced Engine State Management: Introduced dp_engines_running methods and state tracking within EngineCoreClient and EngineCoreClientImpl to accurately reflect the activity of data parallel engines, ensuring LLMEngine.has_unfinished_requests() provides a reliable status.
  • Improved Test Coverage: Added a new multi-node test case for the examples/offline_inference/data_parallel.py example to the CI pipeline, validating the fix and ensuring future regressions are caught.
  • Minor UI Improvement: Added a refresh call for the tqdm progress bar in _run_engine to ensure it updates correctly upon completion of all requests.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request addresses a bug in the offline data parallel example where processes would fail due to lack of synchronization. The changes propagate the engine's running state to the front-end process and incorporate it into the LLMEngine.has_unfinished_requests() method. The review comments suggest improvements to code readability through additional comments and docstrings.

Comment on lines +887 to 890
client_index = -1 if self.has_coordinator else 0
self.output_queue.put_nowait(
(-1,
(client_index,
EngineCoreOutputs(wave_complete=self.current_wave)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a comment explaining why client_index is set to -1 when has_coordinator is true, and 0 otherwise. This will improve code readability.

Suggested change
client_index = -1 if self.has_coordinator else 0
self.output_queue.put_nowait(
(-1,
(client_index,
EngineCoreOutputs(wave_complete=self.current_wave)))
client_index = -1 if self.has_coordinator else 0
# In the coordinator case, dp rank 0 sends updates to the
# coordinator. Otherwise (offline spmd case), each rank
# sends the update to its colocated front-end process.

Comment on lines 158 to 159
def dp_engines_running(self) -> bool:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method should have a docstring explaining its purpose and return value.

Suggested change
def dp_engines_running(self) -> bool:
raise NotImplementedError
def dp_engines_running(self) -> bool:
"""Returns True if the data parallel engines are running."""
raise NotImplementedError

Comment on lines +288 to +289
def dp_engines_running(self) -> bool:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method should have a docstring explaining its purpose and return value.

Suggested change
def dp_engines_running(self) -> bool:
return False
def dp_engines_running(self) -> bool:
"""Returns True if the data parallel engines are running."""
return False

Comment on lines +551 to +552
def dp_engines_running(self) -> bool:
return self.engines_running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method should have a docstring explaining its purpose and return value.

Suggested change
def dp_engines_running(self) -> bool:
return self.engines_running
def dp_engines_running(self) -> bool:
"""Returns True if the data parallel engines are running."""
return self.engines_running

Comment on lines +639 to +640
if outputs.wave_complete is not None:
self.engines_running = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a comment to explain why self.engines_running is set to False when outputs.wave_complete is not None.

Suggested change
if outputs.wave_complete is not None:
self.engines_running = False
if outputs.wave_complete is not None:
self.engines_running = False # No more requests to process

Comment on lines +668 to +669
if self.is_dp:
self.engines_running = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a comment to explain why self.engines_running is set to True when a request is added in data parallel mode.

Suggested change
if self.is_dp:
self.engines_running = True
if self.is_dp:
self.engines_running = True # New requests to process

@@ -160,7 +160,7 @@ def get_num_unfinished_requests(self) -> int:
def has_unfinished_requests(self) -> bool:
has_unfinished = self.output_processor.has_unfinished_requests()
if self.dp_group is None:
return has_unfinished
return has_unfinished or self.engine_core.dp_engines_running()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a comment to explain why dp_engines_running() is called in this condition.

Suggested change
return has_unfinished or self.engine_core.dp_engines_running()
return has_unfinished or self.engine_core.dp_engines_running() # Check if other DP ranks are running

njhill added 2 commits June 21, 2025 16:53
Signed-off-by: Nick Hill <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working ci/build frontend v1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: One node exits unexpectedly when run DP on 2 nodes.
2 participants