Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions e2e_test/sink/sink_into_table/alter_column.slt
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,18 @@ drop table t1;

statement ok
drop table t2;

statement ok
create table t1 (v1 int primary key);

statement ok
create sink s1 into t1 as select 1;

statement ok
create materialized view mv1 as select * from t1;

statement ok
alter table t1 add column v2 int default 100;

statement ok
drop table t1 cascade;
21 changes: 0 additions & 21 deletions src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::anyhow;
use either::Either;
use local_input::LocalInputStreamInner;
use pin_project::pin_project;
use risingwave_common::util::addr::{HostAddr, is_local_address};
use tokio::sync::mpsc;

use super::permit::Receiver;
use crate::executor::prelude::*;
Expand Down Expand Up @@ -89,25 +87,6 @@ pub(crate) fn apply_dispatcher_barrier(
.extend(dispatcher_barrier.passed_actors);
}

pub(crate) async fn process_dispatcher_msg(
dispatcher_msg: DispatcherMessage,
barrier_rx: &mut mpsc::UnboundedReceiver<Barrier>,
) -> StreamExecutorResult<Message> {
let msg = match dispatcher_msg {
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
DispatcherMessage::Barrier(barrier) => {
let mut recv_barrier = barrier_rx
.recv()
.await
.ok_or_else(|| anyhow!("end of barrier recv"))?;
apply_dispatcher_barrier(&mut recv_barrier, barrier);
Message::Barrier(recv_barrier)
}
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
};
Ok(msg)
}

impl LocalInput {
pub fn new(channel: Receiver, upstream_actor_id: ActorId) -> Self {
Self {
Expand Down
66 changes: 26 additions & 40 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::Context as _;
use futures::future::try_join_all;
use risingwave_common::array::StreamChunkBuilder;
use risingwave_common::config::MetricLevel;
use tokio::sync::mpsc;
use tokio::time::Instant;

use super::exchange::input::BoxedActorInput;
use super::*;
use crate::executor::exchange::input::{
assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
};
use crate::executor::prelude::*;
use crate::task::LocalBarrierManager;

Expand Down Expand Up @@ -242,27 +237,37 @@ impl MergeExecutor {
// Channels that're blocked by the barrier to align.
let mut start_time = Instant::now();
pin_mut!(select_all);
while let Some(msg) = select_all.next().await {

let mut barrier_buffer = DispatchBarrierBuffer::new(
self.barrier_rx,
actor_id,
self.upstream_fragment_id,
self.local_barrier_manager,
self.metrics.clone(),
self.fragment_id,
);

loop {
let msg = barrier_buffer.await_next_message(&mut select_all).await?;
metrics
.actor_input_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
let msg: DispatcherMessage = msg?;
let mut msg: Message = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;

match &mut msg {
Message::Watermark(_) => {
// Do nothing.
}
Message::Chunk(chunk) => {
let msg = match msg {
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
DispatcherMessage::Chunk(chunk) => {
metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
Message::Chunk(chunk)
}
Message::Barrier(barrier) => {
DispatcherMessage::Barrier(barrier) => {
tracing::debug!(
target: "events::stream::barrier::path",
actor_id = actor_id,
"receiver receives barrier from path: {:?}",
barrier.passed_actors
);
let (mut barrier, new_inputs) =
barrier_buffer.pop_barrier_with_inputs(barrier).await?;
barrier.passed_actors.push(actor_id);

if let Some(Mutation::Update(UpdateMutation { dispatchers, .. })) =
Expand Down Expand Up @@ -291,30 +296,7 @@ impl MergeExecutor {
// `Watermark` of upstream may become stale after upstream scaling.
select_all.flush_buffered_watermarks();

if !update.added_upstream_actors.is_empty() {
// Create new upstreams receivers.
let mut new_upstreams: Vec<_> = try_join_all(
update.added_upstream_actors.iter().map(|upstream_actor| {
new_input(
&self.local_barrier_manager,
self.metrics.clone(),
self.actor_context.id,
self.fragment_id,
upstream_actor,
new_upstream_fragment_id,
)
}),
)
.await
.context("failed to create upstream receivers")?;

// Poll the first barrier from the new upstreams. It must be the same as
// the one we polled from original upstreams.
for upstream in &mut new_upstreams {
let new_barrier = expect_first_barrier(upstream).await?;
assert_equal_dispatcher_barrier(barrier, &new_barrier);
}

if let Some(new_upstreams) = new_inputs {
// Add the new upstreams to select.
select_all.add_upstreams_from(new_upstreams);
}
Expand All @@ -332,12 +314,16 @@ impl MergeExecutor {
);
}

if barrier.is_stop(actor_id) {
let is_stop = barrier.is_stop(actor_id);
let msg = Message::Barrier(barrier);
if is_stop {
yield msg;
break;
}

msg
}
}
};

yield msg;
start_time = Instant::now();
Expand Down
Loading
Loading