Skip to content

Conversation

Chiro11
Copy link
Contributor

@Chiro11 Chiro11 commented Sep 22, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Based on #23221 (comment)

Decouple barrier mutation from upstream modifications, to resolve deadlock.

  • For the new created table, we must create new executors, then create its corresponding dispatcher for downstream, then we can do execute().
  • Creating the new dispatcher requires waiting for the downstream job to register the new output.
  • The Merge operator in downstream job must receive a barrier from the old table's fragment-graph before it can obtain the mutation, create a new upstream, and register new output.
  • This means that the old job must process the barrier and send it to downstream normally before the new job is created.
  • This means that the upstream job's dispatcher must also be able to send the barrier normally.
  • The upstream job's dispatcher, when receiving the barrier, must wait for the new downstream table to register its output before sending the barrier to both the old and new downstream tables.

So, this becomes:

create-new-job -> downstream-receive-barrier -> old-job-process-barrier -> upstream-dispatch-barrier -> create-new-job

which will cause deadlocks.

See comments in DispatchBarrierBuffer for details.

Close #23064

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

@github-actions github-actions bot added the type/refactor Type: Refactoring. label Sep 22, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the barrier alignment and mutation handling in merge type executors to decouple these operations. The change addresses a dependency chain issue where new upstream actor creation requires downstream merge updates which in turn requires old actor processing to complete.

  • Splits barrier mutation application into two phases: partial application (parsing and caching changes) and full application (updating upstream sets)
  • Introduces caching mechanisms to handle asynchronous barrier reception from different sources
  • Refactors MergeExecutor, ReceiverExecutor, and UpstreamSinkUnionExecutor to use the new two-phase approach

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/stream/src/executor/merge.rs Implements two-phase barrier handling with caching and async coordination for merge operations
src/stream/src/executor/receiver.rs Refactors receiver executor to use cached barrier approach and new message processing logic
src/stream/src/executor/upstream_sink_union.rs Updates upstream sink union executor with similar two-phase barrier handling pattern
src/stream/src/executor/mod.rs Adds early return optimizations and watermark flushing for empty upstream operations
src/stream/src/executor/exchange/input.rs Removes deprecated process_dispatcher_msg function as part of the refactor

@wenym1
Copy link
Contributor

wenym1 commented Sep 22, 2025

After this PR, do we still need #23221?

@Chiro11
Copy link
Contributor Author

Chiro11 commented Sep 22, 2025

After this PR, do we still need #23221?

Deadlock will be handled. But consider that Merge operator now also builds the channel with upstream when the actor is created (not when it is executed), the changes in UpstreamSinkUnion executor can also be retained, for consistency.

@Chiro11 Chiro11 force-pushed the chiro/decouple_merge_mutation branch from 0c378c7 to 984dd59 Compare September 22, 2025 15:34
@Chiro11 Chiro11 force-pushed the chiro/decouple_merge_mutation branch from 984dd59 to 50b9ade Compare September 22, 2025 16:00
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

.flatten();
}

barrier = pending_on_none(self.barrier_rx.recv()) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre_apply_barrier may block stream.next(). Can we possibly make them concurrent?

The concurrency should be like

select!(
    stream.next(), 
    async {
        loop {
            let barrier = self.barrier_rx.recv();
            self.pre_apply_barrier(barrier).await?
            ...
        }
    }
)

Note that we should handle cancellation safety properly. As we may be pending on self.pre_apply_barrier(barrier).await, if stream.next() receives a new message, we will return, and the future in the other select arm will be dropped. If we don't handle cancellation safety properly, we may lose the barrier that we just received but has not finished self.pre_apply_barrier(barrier) yet.

Besides, should return error on None instead of being pending.

Copy link
Contributor Author

@Chiro11 Chiro11 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking, essentially, the pre_apply_barrier only blocks on new_inputs call, which will wait to build stream-rpc with upstream node (for remote input). If the loop is written inside, it could be canceled, and the next time it would have to re-build the connection, forcing all the previous work of to be redone. Is it really necessary, just to prioritize handling upstream messages when we need to add new upstream to Merge? Not to mention that this will make the implementation more complex.

Also, I think it should be pending on barrier_rx closed, because barrier_rx may receive multiple barriers first, and we can't really close it until the real upstream has completely processed all chunks and barriers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary, just to prioritize handling upstream messages when we need to add new upstream to Merge?

If we don't do upstream.next() while creating the new inputs, we may incur back-pressure to upstream and upstream won't be polled. If creating new inputs depends on some progress in upstream, there will be deadlock (though in current implementation there is no yet).

the next time it would have to re-build the connection, forcing all the previous work of to be redone.

This isn't necessarily true. A proper implementation should not discard the previous progress when the future used in select is dropped and recreate a new future to redo everything from zero, but instead should be able to resume the previous progress. That's what I meant for handling cancellation safety properlly. An implementation in my mind will be having a enum to represent the current state and store the state in the struct.

enum BarrierReceiverState {
    ReceivingBarrier,
    CreatingNewInput(future, barrier),
}

We only do self.barrier_rx.recv() when we are in BarrierReceiverState::ReceivingBarrier. When we are in CreatingNewInput, we continue await on the future, and when the future finishes, push the barrier and new input to the queue, and switch to ReceivingBarrier. We don't need to store the future of self.barrier_rx.recv() because it's already cancellation safe.

Also, I think it should be pending on barrier_rx closed, because barrier_rx may receive multiple barriers first, and we can't really close it until the real upstream has completely processed all chunks and barriers.

Got it. Agree.

Copy link
Contributor Author

@Chiro11 Chiro11 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. My previous idea was to put only the barrier in the state machine, and in each select we may need to redo the entire process. That's really amazing!

pin_mut!(upstreams);

let mut current_barrier = None;
let mut cached_barriers = VecDeque::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the current implementation is to decouple the management of upstream inputs and the barrier tx to the upstreams.

If so, we can have a helper struct to extract to logic of managing barrier tx and barrier forward to make the code more structural.

struct UpstreamSinkBarrierManager {
    local_barrier_manager: LocalBarrierManager,
    barrier_tx: HashMap<FragmentId, Sender<Barrier>>,
    pending_barriers: VecDeque<(Barrier, Option<BoxedSinkInput>)>
}

The current partially_apply_barrier along with cached_barriers.push_back((barrier, new_sink_input)) will be its core logic.

Besides, we can make the current select_once to be a method of it and rename it properly. This method can only include the logic of concurrently receive message from upstream and receive barrier and then return the message. The logic of handling the message (add/remove upstreams) can be moved to the for-loop body just like other executors.

@wenym1
Copy link
Contributor

wenym1 commented Sep 23, 2025

After this PR, do we still need #23221?

Deadlock will be handled. But consider that Merge operator now also builds the channel with upstream when the actor is created (not when it is executed), the changes in UpstreamSinkUnion executor can also be retained, for consistency.

Code in #23221 is trivial code move. Let's merge its code change to this PR to review together, and also the its tests can be moved here to test whether this PR has already resolved #23064.

let new_barrier = expect_first_barrier(upstream).await?;
assert_equal_dispatcher_barrier(barrier, &new_barrier);
let new_barriers =
try_join_all(new_upstreams.iter_mut().map(expect_first_barrier))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm even thinking about including expect_first_barrier and assert_equal_dispatcher_barrier in the try_join_all of DispatchBarrierBuffer.

async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
match &mut self.recv_state {
BarrierReceiverState::ReceivingBarrier => {
let Some(barrier) = self.barrier_rx.recv().await else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think currently we can assume that the corresponding barrier_tx is always in the actor state of LocalBarrierWorker (at least in current implementation we clean the actor state only after the stop barrier has been collected from the actors to be dropped). Therefore we can simply treat the end of barrier_rx an unexpected error to simplify the handling logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that UpstreamSinkUnion also holds the Merge executor, and barrier_rx is actually forwarded from the upstream's barrier_tx_map, this maybe not true.

// loop. Otherwise, we need to forward the barrier to the current upstreams, cache this barrier, and
// then wait in the first branch until the upstreams have processed the barrier.
let barrier = barrier.context("Failed to receive barrier from barrier_rx")?;
let pending_barrier = self.partially_apply_barrier(barrier, executor).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that we will have the same cancellation safety issue here. If we gets pending on partially_apply_barrier, and then an upstream message arrives, the barrier will be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we await in the select handler instead of in the branch condition. Shouldn't the entire handler be guaranteed to be executed once?
Besides, I did not write a state machine like Merge, because I think that is too complex and unnecessary. After all, creating a sink is a rare operation. If the sink creation fails, it is normal to cause back pressure on the upstream, we will always have back pressure when align barrier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Can we store the dropped barrier_tx in removed_upstream_fragment_ids? In this way we can still satisfy this assumption, and it doesn't increase complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that meeting end of barrier_rx implicitly depends on the upstream destroying barrier_tx always being later than the destruction of output channel. This dependency isn't obvious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that meeting end of barrier_rx implicitly depends on the upstream destroying barrier_tx always being later than the destruction of output channel.

I didn't get it. Can you elaborate what it means?

To me, storing the barrier_tx until collecting the stop barrier from merge is similar to how we handle stop barrier in local barrier manager, which stores the barrier_tx in actor state, and only remove the state after we have collected the stop barrier from the actor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean here is that the system will become abnormal, so it can only rely creating actor side to detect errors or manually recover. In any case, whether we write a state machine or not, we need to ensure that the input is successfully established. Otherwise, we can only recover the whole system.
Besides, we will not wait for the upstream actor to be really created, but only the establishment of RPC at the process level, IIUC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we shouldn't implement the state machine for twice. But I think there it still a chance to reuse the state machine in both cases.

The general logic of the state machine is that, we receive a barrier with mutaiton from barrier_tx, and then if we see that we should create new input (of any type) from the barrier, we create a future to create it and switch to another state to poll the future to create input, and when the input is created, switch back to the Receiving state, and enqueue the barrier and optional input. The input type can be any general input type, not necessarily the input stream of Merge.

The general logic can be represented via a trait like

trait HandleBarrier {
    type Input;
    fn apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxFuture<'static, Self::Input>>;
}

The state machine becomes

enum BarrierReceiverState<Input> {
    ReceivingBarrier,
    CreatingNewInput(Barrier, BoxFuture<'static, Input>),
}

And DispatchBarrierBuffer becomes a generic struct that can be reused for both cases.

pub(crate) struct BarrierBuffer<H: HandleBarrier> {
    buffer: VecDeque<(Barrier, Option<Vec<H::Input>>)>,
    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
    recv_state: BarrierReceiverState<H::Input>,
    handle: H,
}

Merge and UpstreamSinkUnion can have their own HandleBarrier.

You can have a try. If there is any unconsidered blocking point that prevents us from reusing state machine, we can then give up and keep the current blocking implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean here is that the system will become abnormal, so it can only rely creating actor side to detect errors or manually recover. In any case, whether we write a state machine or not, we need to ensure that the input is successfully established. Otherwise, we can only recover the whole system. Besides, we will not wait for the upstream actor to be really created, but only the establishment of RPC at the process level, IIUC.

I still don't get why errors in creating sink_input can cause barrier-alignment blocking anyway? Why not just return with the error to trigger recovery, instead of still waiting for barrier-alignment? recover the whole system is the standard way to handle any error on the streaing pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get why errors in creating sink_input can cause barrier-alignment blocking anyway? Why not just return with the error to trigger recovery, instead of still waiting for barrier-alignment? recover the whole system is the standard way to handle any error on the streaing pipeline.

Not a big deal. Anyway, according to the current implementation, if the creating-input fails, it will rely on rpc to return an error, and it should not block on anything.

Copy link
Contributor Author

@Chiro11 Chiro11 Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bit difficult.
I think this is because UpstreamSinkUnion and Merge/Receiver have some fundamental differences.

  1. The upstream of UpstreamSinkUnion may be empty (initially empty, and may become empty again after some create/drop operations), while Merge always have non-empty upstreams.
  2. Merge cannot determine receive barriers from either barrier_rx or upstream first, while UpstreamSinkUnion always receive barriers from barrier_rx first.

Perhaps we could find a way to handle both case correctly, but this would lead to unclear semantics (code reader might not find those differences).
Furthermore, we might not be able to perform certain checks in shared code, or we might have to move them outside the shared code.
Some interfaces might also differ (for Merge, we need to support single input, so we use the Stream trait as its arg; for UpstreamSinkUnion, we need to check whether DynamicReceivers is empty. This could theoretically be moved outside, but it may cause redundancy).

Overall, I don't think it's a good idea to unify them.

.get_upstream_fragment_id();
let barrier_rx = self.subscribe_local_barrier(upstream_fragment_id);
let new_input = executor
.new_sink_input(new_upstream_sink, barrier_rx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that new_sink_input is always paired with subscribe_local_barrier. Can we possibly include the logic of subscribe_local_barrier in new_sink_input and return (new_sink_input, barrier_tx)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although they are always paired, they belong to two different classes. According to your suggestion, UpstreamSinkBarrierManager is responsible for managing and forwarding barriers to multiple upstreams, while creating sink_input depends on the some info with executor itself, so it seems reasonable to separate them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now the UpstreamSinkBarrierManager is already responsible for creating new sink input. The executor itself is only responsible for managing (receiving, adding, dropping) the created sink inputs. Current code structure is wired, where UpstreamSinkUnionExecutor calls UpstreamSinkBarrierManager, while inside UpstreamSinkBarrierManager it calls UpstreamSinkUnionExecutor back. A cleaner implementation is that UpstreamSinkBarrierManager can inherently handle all its work without other dependenced state.

Copy link
Contributor Author

@Chiro11 Chiro11 Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but as another comment said, it's a bit complex. I might check back later to see if it's easy to fix.

async fn await_next_message(
&mut self,
upstreams: &mut DynamicReceivers<FragmentId, BarrierMutationType>,
executor: &mut UpstreamSinkUnionExecutor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we make the field upstream_sink_barrier_mgr to be Option in UpstreamSinkUnionExecutor because we want to take the ownership of upstream_sink_barrier_mgr, while keep the ownership of UpstreamSinkUnionExecutor intact, so that its mutable reference can be passed here.

However, it seems unnecessary to pass the &mut UpstreamSinkUnionExecutor now. Previously we need the mutable reference because we also modify its barrier_tx_map when we create new sink input. However, now we have moved the barrier_tx_map to the UpstreamSinkBarrierManager, and there is no need to to pass &mut UpstreamSinkUnionExecutor.

We can actually make the methods to create new sink input to be methods of UpstreamSinkBarrierManager rather than UpstreamSinkUnionExecutor's, along with some fields moving from UpstreamSinkUnionExecutor to UpstreamSinkBarrierManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've considered it, but it would involve too many changes, even including testing. I think it's a bit troublesome.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've considered it, but it would involve too many changes, even including testing. I think it's a bit troublesome.

How do these changes come from? I think the core logic is not changed, and these changes are mostly code movement? If we want to change the owner of these methods from UpstreamSinkUnionExecutor to UpstreamSinkBarrierManager and while minimize the code changes, we can just keep the method body in the original place, and change the method owner by starting a new impl block.

impl UpstreamSinkUnionExecutor {
    ...
    #[cfg(test)]
    pub fn for_test(
        ...
    ) -> Self {
        ...
    }
}

impl UpstreamSinkUnionExecutor {
    async fn new_sink_input(
    ...

The current logic of new_sink_input is also copied from the previous handle_update. We can also reduce the code changes in the same way.

As for test, it shouldn't be a consideration.

After code review, before we merge it, we can then reorganize the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/refactor Type: Refactoring.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unable to add column to table that has both upstream sink and downstream dependency
2 participants