Skip to content

Conversation

cmcgee1024
Copy link
Member

@cmcgee1024 cmcgee1024 commented Aug 15, 2025

The runPipe() function helps to create easy multi-process pipelines in a
similar method as the run() function that accepts a Configuration and
returns a CollectedResult.

let finalJson = try await run(
    Configuration(executable: .path("/bin/echo"), arguments: [#"{"abc":"def"}"#]),
    Configuration(executable: .name("jq")),
    output: .string(limit: 1024*1024)
)
print("\(finalJson.standardOutput)")

TODO:

  • Check that all OutputProtocol types are sharable via error to all of the processes
  • Implement an ExecutionResult variant
  • Generalize so that Swift Tasks in the current process can take part in sections of the pipeline
  • Consider generalizing the run() functions that take a Configuration to automatically set up pipelines with a variadic
  • Test cases
  • Verify that Windows works with this


return try result.get()
} else {
Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to use a task group here, rather than unstructured tasks. That way the subtasks will inherit cancellation & priority properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've converted this to use a task group, which collects the results, and failures.

}

if idx == pipeline.count - 1 {
let result = await Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task here looks redundant; this:

let result = await Task { try await (...) }.result
return try result.get()

is equivalent to:

return try await (...)

input: Input = .none,
output: Output,
error: Error = .discarded
) async throws -> CollectedResult<Output, Error> {
Copy link
Contributor

@jakepetroules jakepetroules Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do shells like bash start every process in a pipeline in parallel regardless of how many there are? Should there be a maxConcurrency parameter to limit it? People probably won't set up 1000+-process pipelines in practice, but maybe there are actually some use cases for that?

Another approach could be an optional parameter like an execution pool, defined like:

protocol ExecutionPool {
    func execute<ReturnValue>(_ work: () async throws -> sending ReturnValue) async throws -> ReturnValue
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this to a variadic function, which should help to reduce the probability of very large process bombs.

Change runPipe into another variant of run using variadic parameters for the configuration
@jakepetroules
Copy link
Contributor

Would a feature like this need a higher level DSL to be able to express all the things you might want to be able to address? Redirecting stderr to stout, discarding intermediate stderr, "pipefail" type behavior where the first failing command exits the pipeline? etc...

I think a single runPipe method with a single set of defined semantics might be too limiting to cover common use cases.

@cmcgee1024
Copy link
Member Author

Pipefail is a good point, and something that I can add as a parameter.

In terms of individually controlling, or redirecting elements in the pipeline I think that would be difficult to achieve with a single function call. I think that the trouble is that the existing Configuration structure isn't really a configuration in the sense that we're talking about here because it doesn't do anything to specify input/output/error. If it did, then this could become multiple run function calls, and the type checker could verify that the next in the pipe has a standard out fd (redirected, or not) that can be the next function's input.

I'll see if I can get an experiment going that yields some ergonomic results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants