-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[WIP] [single_controller] feat: PyTorch Monarch integration #3713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant integration with PyTorch Monarch, including a new PPO trainer and foundational classes for Monarch workers. The changes are extensive and, as noted in the description, are a work in progress. My review focuses on critical and high-severity issues that could cause runtime failures or limit the code's portability. I've identified several instances of hardcoded values that tie the implementation to specific hardware setups, a couple of bugs that would lead to crashes, a missing await in an async function, and a Python version compatibility issue. I have not commented on the cleanup items already listed in the pull request description.
| local_world_size = 8 | ||
| local_rank = rank % local_world_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The local_world_size is hardcoded to 8. This assumes that every node has 8 GPUs, which makes the code brittle and not portable to different hardware configurations. This value should be derived from the ProcMesh or environment configuration rather than being hardcoded.
Similar hardcoded values are found elsewhere in this file:
- Line 149:
super().__init__([4], 1)inMonarchResourcePool - Line 382-383:
rank // 8andrank % 8in_execute_one_rank
These should all be parameterized or determined dynamically to ensure portability across different hardware setups.
| num_hosts: int = 16, # TODO: get the task_group size from MAST | ||
| num_gpus: int = 8, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| gpus=num_gpus, | ||
| ) | ||
|
|
||
| alloc = allocator.allocate(spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create_mast_proc_mesh function is async, which suggests that allocator.allocate might be a coroutine. If so, it should be awaited: alloc = await allocator.allocate(spec). Otherwise, it will not execute correctly and will likely result in a runtime error.
| alloc = allocator.allocate(spec) | |
| alloc = await allocator.allocate(spec) |
| "val_before_train", True | ||
| ): | ||
| val_metrics = self._validate() | ||
| assert val_metrics, f"{val_metrics=}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _validate method can return an empty dictionary (e.g., on line 813), which will cause this assertion to fail and crash the training process. The assertion should be removed or the logic should be changed to handle an empty val_metrics dictionary gracefully, for example by skipping logging if it's empty.
| if self.config.reward_model.launch_reward_fn_async: | ||
| future_reward = compute_reward_async.remote( | ||
| batch, self.config, self.tokenizer | ||
| ) | ||
| else: | ||
| reward_tensor, reward_extra_infos_dict = compute_reward( | ||
| batch, self.reward_fn | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When self.config.reward_model.launch_reward_fn_async is true, future_reward is created but never awaited. The code then proceeds to use reward_tensor and reward_extra_infos_dict, which are only defined in the else block. This will lead to a NameError. The async path seems incomplete and will crash. Although a NotImplementedError is raised later on line 1502 for this path, the crash will happen before that.
| # only valid when Python >= 3.9 | ||
| original_method_name = method_name.removeprefix(prefix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removeprefix string method was introduced in Python 3.9. While the comment acknowledges this, it's better to use a backward-compatible alternative to avoid runtime errors on older Python versions. You can replace it with string slicing.
| # only valid when Python >= 3.9 | |
| original_method_name = method_name.removeprefix(prefix) | |
| original_method_name = method_name[len(prefix):] |
What does this PR do?
Rough initial transfer of internal monarch integration.
Cleanup items:
create_mast_proc_meshneeds to be removedapply_kl_penaltyshould be moved to shared util moduleTest
Experimental results available from post-training the Qwen-2.5-7B on H200 GPUs using Megatron-LM.
Experimental data pending wider release.
API and Usage Example
Design & Code Changes
TODO
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)