Skip to content

Conversation

@Ol1ver0413
Copy link
Collaborator

@Ol1ver0413 Ol1ver0413 commented Sep 18, 2025

Description

Add pipeline running mode in workforce. #1663

Checklist

Go over all the following points, and put an x in all the boxes that apply.

  • I have read the CONTRIBUTION guide (required)
  • I have linked this PR to an issue using the Development section on the right sidebar or by adding Fixes #issue-number in the PR description (required)
  • I have checked if any dependencies need to be added or updated in pyproject.toml and uv lock
  • I have updated the tests accordingly (required for a bug fix or a new feature)
  • I have updated the documentation if needed:
  • I have added examples if this is a new feature

If you are unsure about any of these, don't hesitate to ask. We are here to help!

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch workforce_pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@Ol1ver0413 Ol1ver0413 self-assigned this Sep 18, 2025
@Ol1ver0413
Copy link
Collaborator Author

graph TD
    A[Task: Generate 5 AI/ML Papers] --> B[Literature Researcher]
    B --> C{Fork: 5 Parallel Tasks}
    
    C --> D1[Summary Specialist 1<br/>Summarize Paper 1]
    C --> D2[Summary Specialist 2<br/>Summarize Paper 2]
    C --> D3[Summary Specialist 3<br/>Summarize Paper 3]
    C --> D4[Summary Specialist 4<br/>Summarize Paper 4]
    C --> D5[Summary Specialist 5<br/>Summarize Paper 5]
    
    D1 --> E{Join: Collect Summaries}
    D2 --> E
    D3 --> E
    D4 --> E
    D5 --> E
    
    E --> F[Research Synthesizer<br/>Analyze AI/ML Trends]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style D5 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Generate 4 RESTful API Types] --> B[API Researcher]
    B --> C{Fork: 4 Parallel Tasks}
    
    C --> D1[API Analyst 1<br/>Analyze API 1]
    C --> D2[API Analyst 2<br/>Analyze API 2]
    C --> D3[API Analyst 3<br/>Analyze API 3]
    C --> D4[API Analyst 4<br/>Analyze API 4]
    
    D1 --> E{Join: Collect Analyses}
    D2 --> E
    D3 --> E
    D4 --> E
    
    E --> F[Documentation Writer<br/>Generate API Usage Guide]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Code Review Analysis] --> B[Code Scanner]
    B --> C{Fork: 3 Parallel Tasks}
    
    C --> D1[Code Reviewer 1<br/>Review File 1]
    C --> D2[Code Reviewer 2<br/>Review File 2]
    C --> D3[Code Reviewer 3<br/>Review File 3]
    
    D1 --> E{Join: Collect Reviews}
    D2 --> E
    D3 --> E
    
    E --> F[Review Summarizer<br/>Generate Comprehensive Report]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413 Ol1ver0413 marked this pull request as ready for review September 21, 2025 15:01
@Ol1ver0413
Copy link
Collaborator Author

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

@Wendong-Fan
Copy link
Member

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

thanks @Ol1ver0413 !

@Wendong-Fan Wendong-Fan added the Review Required PR need to be reviewed label Sep 29, 2025
@Wendong-Fan Wendong-Fan added this to the Sprint 38 milestone Sep 29, 2025
@Wendong-Fan Wendong-Fan linked an issue Sep 29, 2025 that may be closed by this pull request
2 tasks
Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comprehensive PR @Ol1ver0413. I added a comment. Also I think the method namings could be a bit more consistent and remembrable.

  # before
  workforce.add_pipeline_task()
  workforce.fork_pipeline()
  workforce.add_parallel_pipeline_tasks()
  builder.fork()  
  builder.add_parallel_tasks() 

  # after
  workforce.pipeline_add()
  workforce.pipeline_fork()
  workforce.pipeline_join()
  workforce.pipeline_build()
  builder.add()
  builder.fork()
  builder.join()
  builder.build()

Comment on lines 1533 to 1540
expected_task_ids = {task.id for task in self._pending_tasks}
expected_task_ids.update(task.id for task in self._completed_tasks)

completed_successful_ids = {
task.id for task in self._completed_tasks
if task.state == TaskState.DONE
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Thanks @hesamsheikh ! Maybe I need to add some branch task failure handling mechanisms into the pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @hesamsheikh. Since the original design follows a pipeline pattern, I didn’t perform task restructuring or decomposition as in the decompose mode. Therefore, I decided that if a task fails, it would still be considered successful, and the error message would be passed to the join stage. We can discuss whether there might be a better approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this method seems a bit confusing and hard to understand anyway. Could you maybe simplify it, or explain it a bit in the comments? I know how it works and what it does, but not the intent of it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Yeah, I'll add more comments.

Comment on lines 702 to 707

# Clear existing tasks and dependencies
self._pending_tasks.clear()
self._task_dependencies.clear()
self._assignees.clear()

Copy link
Collaborator

@a7m-1st a7m-1st Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()

Thanks @a7m-1st. The pipeline mode was designed for sequential predefined tasks, which cannot be interrupted by human. But maybe we can have a further discussion for it. Thanks again!

@Wendong-Fan Wendong-Fan added Waiting for Update PR has been reviewed, need to be updated based on review comment and removed Review Required PR need to be reviewed labels Oct 19, 2025
@Wendong-Fan
Copy link
Member

hi @Ol1ver0413

Hope you're doing well!

Just wanted to check in on this PR. It looks like there's some feedback that needs to be addressed, along with some failing in pre-commit checks.

Please let us know if you have any questions or need a hand with anything. We're looking forward to getting this merged!

Cheers

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Oct 19, 2025

hi @Ol1ver0413

Hope you're doing well!

Just wanted to check in on this PR. It looks like there's some feedback that needs to be addressed, along with some failing in pre-commit checks.

Please let us know if you have any questions or need a hand with anything. We're looking forward to getting this merged!

Cheers

Hi, @Wendong-Fan!
I’ve been handling some work transitions lately, which may have caused me to overlook enhancing the running mode in the workforce. I’ll make sure to complete it soon.
Thanks again.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the new changes @Ol1ver0413. Aside from the comments I previously mentioned, which don't seem resolved yet, and a lack of test cases, I have nothing more to add.

@Ol1ver0413
Copy link
Collaborator Author

Thanks for the new changes @Ol1ver0413. Aside from the comments I previously mentioned, which don't seem resolved yet, and a lack of test cases, I have nothing more to add.

Thanks for the feedback! I’ll review your previous comments again and make sure to address them. I’ll also add the missing test cases soon.

Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @Ol1ver0413
I added a minor comment, also make sure to fix pre-commit issues.
everything else seems fine to me.


# Pipeline building state
self._pipeline_builder: Optional[PipelineTaskBuilder] = None
self._pipeline_tasks_need_assignment: bool = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable

Copy link
Collaborator

@a7m-1st a7m-1st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like all of my concerns has been solved. Thanks a lot @Ol1ver0413.
LGTM 👍

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Nov 11, 2025

Thanks @a7m-1st @hesamsheikh a lot for such a long time reviewing! You guys' feedback helped me improve the PR a lot. Love U guys ❤️.

@hesamsheikh
Copy link
Collaborator

You're welcome @Ol1ver0413
Please fix the pre-commit issues so the checks are passed and this is ready for merge.

@hesamsheikh
Copy link
Collaborator

Hey @Ol1ver0413
Is this ready for the final review?

@Ol1ver0413

This comment was marked as abuse.

Copy link
Collaborator

@fengju0213 fengju0213 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 thanks for this amazing pr!

@fengju0213 fengju0213 merged commit 6cbeee0 into master Nov 20, 2025
12 of 13 checks passed
@fengju0213 fengju0213 deleted the workforce_pipeline branch November 20, 2025 08:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Review Required PR need to be reviewed

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[Feature Request] Support more running mode in workforce

6 participants