Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions e2e_test/sink/sink_into_table/alter_column.slt
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,18 @@ drop table t1;

statement ok
drop table t2;

statement ok
create table t1 (v1 int primary key);

statement ok
create sink s1 into t1 as select 1;

statement ok
create materialized view mv1 as select * from t1;

statement ok
alter table t1 add column v2 int default 100;

statement ok
drop table t1 cascade;
62 changes: 29 additions & 33 deletions src/stream/src/executor/upstream_sink_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::task::{Context, Poll};

use anyhow::Context as _;
use futures::future::try_join_all;
use itertools::Itertools;
use pin_project::pin_project;
use risingwave_common::catalog::Field;
use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost};
Expand Down Expand Up @@ -178,7 +179,7 @@ pub struct UpstreamSinkUnionExecutor {
chunk_size: usize,

/// The initial inputs to the executor.
initial_upstream_infos: Vec<UpstreamFragmentInfo>,
initial_inputs: Vec<BoxedSinkInput>,

/// The error report for evaluation errors.
eval_error_report: ActorEvalErrorReport,
Expand All @@ -193,7 +194,10 @@ pub struct UpstreamSinkUnionExecutor {
impl Debug for UpstreamSinkUnionExecutor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UpstreamSinkUnionExecutor")
.field("initial_upstream_infos", &self.initial_upstream_infos)
.field(
"initial_upstream_fragments",
&self.initial_inputs.iter().map(|i| i.id()).collect_vec(),
)
.finish()
}
}
Expand All @@ -205,25 +209,42 @@ impl Execute for UpstreamSinkUnionExecutor {
}

impl UpstreamSinkUnionExecutor {
pub fn new(
// Need to wait for establishing stream-connections to upstream actors, so async.
pub async fn new(
Copy link
Member

@BugenZhao BugenZhao Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to show that when this function might be blocking (i.e. not immediately ready)? Seems like the origin is RemoteInput::new.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here we need to wait for establishing stream-rpc with the upstream actor.

ctx: ActorContextRef,
local_barrier_manager: LocalBarrierManager,
executor_stats: Arc<StreamingMetrics>,
chunk_size: usize,
initial_upstream_infos: Vec<UpstreamFragmentInfo>,
eval_error_report: ActorEvalErrorReport,
) -> Self {
) -> StreamExecutorResult<Self> {
let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id);
Self {
let mut executor = Self {
actor_context: ctx,
local_barrier_manager,
executor_stats,
chunk_size,
initial_upstream_infos,
initial_inputs: Default::default(),
eval_error_report,
barrier_rx,
barrier_tx_map: Default::default(),
};
executor.init_with_upstreams(initial_upstream_infos).await?;

Ok(executor)
}

async fn init_with_upstreams(
&mut self,
initial_upstream_infos: Vec<UpstreamFragmentInfo>,
) -> StreamExecutorResult<()> {
self.initial_inputs.reserve(initial_upstream_infos.len());
for info in initial_upstream_infos {
let input = self.new_sink_input(info).await?;
self.initial_inputs.push(input);
}

Ok(())
}

#[cfg(test)]
Expand All @@ -240,7 +261,7 @@ impl UpstreamSinkUnionExecutor {
local_barrier_manager,
executor_stats: metrics.into(),
chunk_size,
initial_upstream_infos: vec![],
initial_inputs: vec![],
eval_error_report: ActorEvalErrorReport {
actor_context: actor_ctx,
identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(),
Expand Down Expand Up @@ -317,32 +338,7 @@ impl UpstreamSinkUnionExecutor {

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self: Box<Self>) {
let inputs: Vec<_> = {
let initial_upstream_infos = std::mem::take(&mut self.initial_upstream_infos);
let mut inputs = Vec::with_capacity(initial_upstream_infos.len());
for UpstreamFragmentInfo {
upstream_fragment_id,
upstream_actors,
merge_schema,
project_exprs,
} in initial_upstream_infos
{
let merge_executor = self
.new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema)
.await?;

let input = SinkHandlerInput::new(
upstream_fragment_id,
Box::new(merge_executor),
project_exprs,
)
.boxed_input();

inputs.push(input);
}
inputs
};

let inputs = std::mem::take(&mut self.initial_inputs);
let execution_stream = self.execute_with_inputs(inputs);
pin_mut!(execution_stream);
while let Some(msg) = execution_stream.next().await {
Expand Down
47 changes: 24 additions & 23 deletions src/stream/src/from_proto/upstream_sink_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,31 @@ impl ExecutorBuilder for UpstreamSinkUnionExecutorBuilder {
node: &Self::Node,
_store: impl StateStore,
) -> StreamResult<Executor> {
let mut upstreams = Vec::with_capacity(node.get_init_upstreams().len());
for init_upstream in node.get_init_upstreams() {
let upstream_fragment_id = init_upstream.get_upstream_fragment_id();
let upstream_fragment_info = UpstreamFragmentInfo::new(
upstream_fragment_id,
&params.actor_context.initial_upstream_actors,
init_upstream.get_sink_output_schema(),
init_upstream.get_project_exprs(),
params.eval_error_report.clone(),
)?;
upstreams.push(upstream_fragment_info);
}
let init_upstreams = node
.get_init_upstreams()
.iter()
.map(|init_upstream| {
let upstream_fragment_id = init_upstream.get_upstream_fragment_id();
UpstreamFragmentInfo::new(
upstream_fragment_id,
&params.actor_context.initial_upstream_actors,
init_upstream.get_sink_output_schema(),
init_upstream.get_project_exprs(),
params.eval_error_report.clone(),
)
})
.try_collect()?;

Ok((
params.info,
UpstreamSinkUnionExecutor::new(
params.actor_context,
params.local_barrier_manager,
params.executor_stats,
params.env.config().developer.chunk_size,
upstreams,
params.eval_error_report,
),
let executor = UpstreamSinkUnionExecutor::new(
params.actor_context,
params.local_barrier_manager,
params.executor_stats,
params.env.config().developer.chunk_size,
init_upstreams,
params.eval_error_report,
)
.into())
.await?;

Ok((params.info, executor).into())
}
}
Loading