From 93a7aea3cf9c4d54cb03c3161b3a532ff34fc310 Mon Sep 17 00:00:00 2001 From: Chiro11 Date: Tue, 16 Sep 2025 18:56:06 +0800 Subject: [PATCH 1/2] init --- .../sink/sink_into_table/alter_column.slt | 15 +++++ .../src/executor/upstream_sink_union.rs | 61 +++++++++---------- .../src/from_proto/upstream_sink_union.rs | 47 +++++++------- 3 files changed, 67 insertions(+), 56 deletions(-) diff --git a/e2e_test/sink/sink_into_table/alter_column.slt b/e2e_test/sink/sink_into_table/alter_column.slt index c88ae8095280b..01067e7d637c9 100644 --- a/e2e_test/sink/sink_into_table/alter_column.slt +++ b/e2e_test/sink/sink_into_table/alter_column.slt @@ -395,3 +395,18 @@ drop table t1; statement ok drop table t2; + +statement ok +create table t1 (v1 int primary key); + +statement ok +create sink s1 into t1 as select 1; + +statement ok +create materialized view mv1 as select * from t1; + +statement ok +alter table t1 add column v2 int default 100; + +statement ok +drop table t1 cascade; diff --git a/src/stream/src/executor/upstream_sink_union.rs b/src/stream/src/executor/upstream_sink_union.rs index ce86b30563b2b..9e4edf4ded1af 100644 --- a/src/stream/src/executor/upstream_sink_union.rs +++ b/src/stream/src/executor/upstream_sink_union.rs @@ -19,6 +19,7 @@ use std::task::{Context, Poll}; use anyhow::Context as _; use futures::future::try_join_all; +use itertools::Itertools; use pin_project::pin_project; use risingwave_common::catalog::Field; use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost}; @@ -178,7 +179,7 @@ pub struct UpstreamSinkUnionExecutor { chunk_size: usize, /// The initial inputs to the executor. - initial_upstream_infos: Vec, + initial_inputs: Vec, /// The error report for evaluation errors. eval_error_report: ActorEvalErrorReport, @@ -193,7 +194,10 @@ pub struct UpstreamSinkUnionExecutor { impl Debug for UpstreamSinkUnionExecutor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("UpstreamSinkUnionExecutor") - .field("initial_upstream_infos", &self.initial_upstream_infos) + .field( + "initial_upstream_fragments", + &self.initial_inputs.iter().map(|i| i.id()).collect_vec(), + ) .finish() } } @@ -205,25 +209,41 @@ impl Execute for UpstreamSinkUnionExecutor { } impl UpstreamSinkUnionExecutor { - pub fn new( + pub async fn new( ctx: ActorContextRef, local_barrier_manager: LocalBarrierManager, executor_stats: Arc, chunk_size: usize, initial_upstream_infos: Vec, eval_error_report: ActorEvalErrorReport, - ) -> Self { + ) -> StreamExecutorResult { let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id); - Self { + let mut executor = Self { actor_context: ctx, local_barrier_manager, executor_stats, chunk_size, - initial_upstream_infos, + initial_inputs: Default::default(), eval_error_report, barrier_rx, barrier_tx_map: Default::default(), + }; + executor.init_with_upstreams(initial_upstream_infos).await?; + + Ok(executor) + } + + async fn init_with_upstreams( + &mut self, + initial_upstream_infos: Vec, + ) -> StreamExecutorResult<()> { + self.initial_inputs.reserve(initial_upstream_infos.len()); + for info in initial_upstream_infos { + let input = self.new_sink_input(info).await?; + self.initial_inputs.push(input); } + + Ok(()) } #[cfg(test)] @@ -240,7 +260,7 @@ impl UpstreamSinkUnionExecutor { local_barrier_manager, executor_stats: metrics.into(), chunk_size, - initial_upstream_infos: vec![], + initial_inputs: vec![], eval_error_report: ActorEvalErrorReport { actor_context: actor_ctx, identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(), @@ -317,32 +337,7 @@ impl UpstreamSinkUnionExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self: Box) { - let inputs: Vec<_> = { - let initial_upstream_infos = std::mem::take(&mut self.initial_upstream_infos); - let mut inputs = Vec::with_capacity(initial_upstream_infos.len()); - for UpstreamFragmentInfo { - upstream_fragment_id, - upstream_actors, - merge_schema, - project_exprs, - } in initial_upstream_infos - { - let merge_executor = self - .new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema) - .await?; - - let input = SinkHandlerInput::new( - upstream_fragment_id, - Box::new(merge_executor), - project_exprs, - ) - .boxed_input(); - - inputs.push(input); - } - inputs - }; - + let inputs = std::mem::take(&mut self.initial_inputs); let execution_stream = self.execute_with_inputs(inputs); pin_mut!(execution_stream); while let Some(msg) = execution_stream.next().await { diff --git a/src/stream/src/from_proto/upstream_sink_union.rs b/src/stream/src/from_proto/upstream_sink_union.rs index 5a4ad2c83024b..f8bebab4d59e9 100644 --- a/src/stream/src/from_proto/upstream_sink_union.rs +++ b/src/stream/src/from_proto/upstream_sink_union.rs @@ -27,30 +27,31 @@ impl ExecutorBuilder for UpstreamSinkUnionExecutorBuilder { node: &Self::Node, _store: impl StateStore, ) -> StreamResult { - let mut upstreams = Vec::with_capacity(node.get_init_upstreams().len()); - for init_upstream in node.get_init_upstreams() { - let upstream_fragment_id = init_upstream.get_upstream_fragment_id(); - let upstream_fragment_info = UpstreamFragmentInfo::new( - upstream_fragment_id, - ¶ms.actor_context.initial_upstream_actors, - init_upstream.get_sink_output_schema(), - init_upstream.get_project_exprs(), - params.eval_error_report.clone(), - )?; - upstreams.push(upstream_fragment_info); - } + let init_upstreams = node + .get_init_upstreams() + .iter() + .map(|init_upstream| { + let upstream_fragment_id = init_upstream.get_upstream_fragment_id(); + UpstreamFragmentInfo::new( + upstream_fragment_id, + ¶ms.actor_context.initial_upstream_actors, + init_upstream.get_sink_output_schema(), + init_upstream.get_project_exprs(), + params.eval_error_report.clone(), + ) + }) + .try_collect()?; - Ok(( - params.info, - UpstreamSinkUnionExecutor::new( - params.actor_context, - params.local_barrier_manager, - params.executor_stats, - params.env.config().developer.chunk_size, - upstreams, - params.eval_error_report, - ), + let executor = UpstreamSinkUnionExecutor::new( + params.actor_context, + params.local_barrier_manager, + params.executor_stats, + params.env.config().developer.chunk_size, + init_upstreams, + params.eval_error_report, ) - .into()) + .await?; + + Ok((params.info, executor).into()) } } From f3bb975f90f1af14fd6be14d3ef66820a3400f02 Mon Sep 17 00:00:00 2001 From: Chiro11 Date: Wed, 17 Sep 2025 13:45:05 +0800 Subject: [PATCH 2/2] add_comments --- src/stream/src/executor/upstream_sink_union.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/executor/upstream_sink_union.rs b/src/stream/src/executor/upstream_sink_union.rs index 9e4edf4ded1af..5f2e22a5c0a00 100644 --- a/src/stream/src/executor/upstream_sink_union.rs +++ b/src/stream/src/executor/upstream_sink_union.rs @@ -209,6 +209,7 @@ impl Execute for UpstreamSinkUnionExecutor { } impl UpstreamSinkUnionExecutor { + // Need to wait for establishing stream-connections to upstream actors, so async. pub async fn new( ctx: ActorContextRef, local_barrier_manager: LocalBarrierManager,