From e831bb1728bc201a97edd9444c0938442e8156f2 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 26 Sep 2025 23:20:30 +0800 Subject: [PATCH 1/3] support truncate locality backfill state --- proto/stream_plan.proto | 1 + .../src/barrier/backfill_order_control.rs | 4 + src/meta/src/barrier/checkpoint/control.rs | 15 +++- .../barrier/checkpoint/creating_job/status.rs | 19 ++++- src/meta/src/barrier/command.rs | 14 ++- src/meta/src/barrier/progress.rs | 85 +++++++++++++------ .../executor/backfill/arrangement_backfill.rs | 2 +- .../executor/backfill/no_shuffle_backfill.rs | 2 +- src/stream/src/executor/locality_provider.rs | 49 +++++++++-- src/stream/src/executor/mod.rs | 24 ++++-- .../source/source_backfill_executor.rs | 4 +- .../src/from_proto/locality_provider.rs | 1 + src/stream/src/task/barrier_manager/mod.rs | 13 +++ .../src/task/barrier_worker/managed_state.rs | 48 +++++++++++ 14 files changed, 229 insertions(+), 52 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 58bcf1f5d09eb..a25f5b281f933 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -144,6 +144,7 @@ message ConnectorPropsChangeMutation { message StartFragmentBackfillMutation { repeated uint32 fragment_ids = 1; + repeated uint32 truncate_locality_fragment_ids = 2; } message RefreshStartMutation { diff --git a/src/meta/src/barrier/backfill_order_control.rs b/src/meta/src/barrier/backfill_order_control.rs index 09852b7babbbd..59071d2c36d72 100644 --- a/src/meta/src/barrier/backfill_order_control.rs +++ b/src/meta/src/barrier/backfill_order_control.rs @@ -195,4 +195,8 @@ impl BackfillOrderState { } newly_scheduled } + + pub fn current_backfill_node_fragment_ids(&self) -> Vec { + self.current_backfill_nodes.keys().copied().collect() + } } diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 8c3ad33351cbf..425063956eb8a 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -1137,11 +1137,18 @@ impl DatabaseCheckpointControl { if let Some(jobs_to_merge) = self.jobs_to_merge() { command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)); } else { - let pending_backfill_nodes = - self.create_mview_tracker.take_pending_backfill_nodes(); - if !pending_backfill_nodes.is_empty() { + let fragments = self.create_mview_tracker.take_pending_backfill_nodes(); + if !fragments.next_backfill_nodes.is_empty() + || !fragments.last_backfill_nodes.is_empty() + { + tracing::info!( + "start fragment backfill for mview, next_backfill_nodes: {:?}, last_backfill_nodes: {:?}", + fragments.next_backfill_nodes, + fragments.last_backfill_nodes + ); command = Some(Command::StartFragmentBackfill { - fragment_ids: pending_backfill_nodes, + fragment_ids: fragments.next_backfill_nodes, + truncate_locality_fragment_ids: fragments.last_backfill_nodes, }); } } diff --git a/src/meta/src/barrier/checkpoint/creating_job/status.rs b/src/meta/src/barrier/checkpoint/creating_job/status.rs index 73abac2eb11cf..eee11fcebb4e4 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/status.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/status.rs @@ -215,13 +215,26 @@ impl CreatingStreamingJobStatus { .. } => { let mutation = { - let pending_backfill_nodes = create_mview_tracker.take_pending_backfill_nodes(); - if pending_backfill_nodes.is_empty() { + let fragments = create_mview_tracker.take_pending_backfill_nodes(); + if fragments.next_backfill_nodes.is_empty() + && fragments.last_backfill_nodes.is_empty() + { None } else { + tracing::info!( + "Injecting backfill mutation for new upstream epoch. next_backfill_nodes: {:?}, last_backfill_nodes: {:?}", + fragments.next_backfill_nodes, + fragments.last_backfill_nodes + ); Some(Mutation::StartFragmentBackfill( StartFragmentBackfillMutation { - fragment_ids: pending_backfill_nodes + fragment_ids: fragments + .next_backfill_nodes + .into_iter() + .map(|fragment_id| fragment_id as _) + .collect(), + truncate_locality_fragment_ids: fragments + .last_backfill_nodes .into_iter() .map(|fragment_id| fragment_id as _) .collect(), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index e63f1fd532574..5d24d9267e3d7 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -395,8 +395,10 @@ pub enum Command { ConnectorPropsChange(ConnectorPropsChange), /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`. + /// The `truncate_locality_fragment_ids` are the locality backfill fragments that need to truncate their state. StartFragmentBackfill { fragment_ids: Vec, + truncate_locality_fragment_ids: Vec, }, /// `Refresh` command generates a barrier to refresh a table by truncating state @@ -1316,11 +1318,15 @@ impl Command { }, )) } - Command::StartFragmentBackfill { fragment_ids } => Some( - Mutation::StartFragmentBackfill(StartFragmentBackfillMutation { + Command::StartFragmentBackfill { + fragment_ids, + truncate_locality_fragment_ids, + } => Some(Mutation::StartFragmentBackfill( + StartFragmentBackfillMutation { fragment_ids: fragment_ids.clone(), - }), - ), + truncate_locality_fragment_ids: truncate_locality_fragment_ids.clone(), + }, + )), Command::Refresh { table_id, associated_source_id, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 7a4ec26647b25..6af683a078022 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -40,6 +40,16 @@ enum BackfillState { Done(ConsumedRows), } +/// Represents the backfill nodes that need to be scheduled or cleaned up. +#[derive(Debug, Default)] +pub(super) struct PendingBackfillFragments { + /// Fragment IDs that should start backfilling in the next checkpoint + pub next_backfill_nodes: Vec, + /// Fragment IDs whose locality backfill state should be truncated + /// (they have just completed backfilling) + pub last_backfill_nodes: Vec, +} + /// Progress of all actors containing backfill executors while creating mview. #[derive(Debug)] pub(super) struct Progress { @@ -100,13 +110,14 @@ impl Progress { } /// Update the progress of `actor`. + /// Returns the backfill fragments that need to be scheduled or cleaned up. fn update( &mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64, - ) -> Vec { - let mut next_backfill_nodes = vec![]; + ) -> PendingBackfillFragments { + let mut result = PendingBackfillFragments::default(); self.upstream_mvs_total_key_count = upstream_total_key_count; let total_actors = self.states.len(); let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap(); @@ -130,7 +141,18 @@ impl Progress { tracing::debug!("actor {} done", actor); new = *new_consumed_rows; self.done_count += 1; - next_backfill_nodes = self.backfill_order_state.finish_actor(actor); + let before_backfill_nodes = self + .backfill_order_state + .current_backfill_node_fragment_ids(); + result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor); + let after_backfill_nodes = self + .backfill_order_state + .current_backfill_node_fragment_ids(); + // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes + result.last_backfill_nodes = before_backfill_nodes + .into_iter() + .filter(|x| !after_backfill_nodes.contains(x)) + .collect(); tracing::debug!( "{} actors out of {} complete", self.done_count, @@ -155,7 +177,7 @@ impl Progress { } } self.states.insert(actor, new_state); - next_backfill_nodes + result } /// Returns whether all backfill executors are done. @@ -297,8 +319,10 @@ pub(super) struct TrackingCommand { pub(super) enum UpdateProgressResult { None, - Finished(TrackingJob), - BackfillNodeFinished(Vec), + /// The finished job, along with its pending backfill fragments for cleanup. + Finished(TrackingJob, PendingBackfillFragments), + /// Backfill nodes have finished and new ones need to be scheduled. + BackfillNodeFinished(PendingBackfillFragments), } /// Tracking is done as follows: @@ -316,8 +340,8 @@ pub(super) struct CreateMviewProgressTracker { /// Stash of finished jobs. They will be finally finished on checkpoint. pending_finished_jobs: Vec, - /// Stash of pending backfill nodes. They will start backfilling on checkpoint. - pending_backfill_nodes: Vec, + /// Stash of pending backfill fragments. They will start backfilling or be cleaned up on checkpoint. + pending_backfill_fragments: Vec, } impl CreateMviewProgressTracker { @@ -361,7 +385,7 @@ impl CreateMviewProgressTracker { progress_map, actor_map, pending_finished_jobs: Vec::new(), - pending_backfill_nodes: Vec::new(), + pending_backfill_fragments: Vec::new(), } } @@ -435,17 +459,18 @@ impl CreateMviewProgressTracker { UpdateProgressResult::None => { tracing::trace!(?progress, "update progress"); } - UpdateProgressResult::Finished(command) => { + UpdateProgressResult::Finished(command, fragments) => { + self.queue_backfill(fragments); tracing::trace!(?progress, "finish progress"); commands.push(command); } - UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => { + UpdateProgressResult::BackfillNodeFinished(fragments) => { tracing::trace!( ?progress, - ?next_backfill_nodes, + next_backfill_nodes = ?fragments.next_backfill_nodes, "start next backfill node" ); - self.queue_backfill(next_backfill_nodes); + self.queue_backfill(fragments); } } } @@ -506,8 +531,8 @@ impl CreateMviewProgressTracker { self.pending_finished_jobs.push(finished_job); } - fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator) { - self.pending_backfill_nodes.extend(backfill_nodes); + fn queue_backfill(&mut self, fragments: PendingBackfillFragments) { + self.pending_backfill_fragments.push(fragments); } /// Finish stashed jobs on checkpoint. @@ -516,8 +541,15 @@ impl CreateMviewProgressTracker { take(&mut self.pending_finished_jobs) } - pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec { - take(&mut self.pending_backfill_nodes) + /// Take the pending backfill fragments to start on checkpoint. + /// Merge all pending backfill fragments into one. + pub(super) fn take_pending_backfill_nodes(&mut self) -> PendingBackfillFragments { + let mut result = PendingBackfillFragments::default(); + for fragments in take(&mut self.pending_backfill_fragments) { + result.next_backfill_nodes.extend(fragments.next_backfill_nodes); + result.last_backfill_nodes.extend(fragments.last_backfill_nodes); + } + result } pub(super) fn has_pending_finished_jobs(&self) -> bool { @@ -640,8 +672,7 @@ impl CreateMviewProgressTracker { calculate_total_key_count(&progress.upstream_mv_count, version_stats); tracing::debug!(?table_id, "updating progress for table"); - let next_backfill_nodes = - progress.update(actor, new_state, upstream_total_key_count); + let fragments = progress.update(actor, new_state, upstream_total_key_count); if progress.is_done() { tracing::debug!( @@ -653,11 +684,17 @@ impl CreateMviewProgressTracker { for actor in o.get().0.actors() { self.actor_map.remove(&actor); } - assert!(next_backfill_nodes.is_empty()); - UpdateProgressResult::Finished(o.remove().1) - } else if !next_backfill_nodes.is_empty() { - tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes); - UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) + assert!(fragments.next_backfill_nodes.is_empty()); + UpdateProgressResult::Finished(o.remove().1, fragments) + } else if !fragments.next_backfill_nodes.is_empty() + || !fragments.last_backfill_nodes.is_empty() + { + tracing::debug!( + "scheduling next backfill nodes: {:?} and truncate last backfill nodes: {:?}", + fragments.next_backfill_nodes, + fragments.last_backfill_nodes + ); + UpdateProgressResult::BackfillNodeFinished(fragments) } else { UpdateProgressResult::None } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 401f5d0d27ba8..717ba9b696442 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -552,7 +552,7 @@ where Mutation::Resume => { global_pause = false; } - Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => { + Mutation::StartFragmentBackfill { fragment_ids, .. } if backfill_paused => { if fragment_ids.contains(&self.fragment_id) { backfill_paused = false; } diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 8e9327780d114..a1be5d6e6df12 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -460,7 +460,7 @@ where Mutation::Resume => { global_pause = false; } - Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => { + Mutation::StartFragmentBackfill { fragment_ids, .. } if backfill_paused => { if fragment_ids.contains(&self.fragment_id) { backfill_paused = false; } diff --git a/src/stream/src/executor/locality_provider.rs b/src/stream/src/executor/locality_provider.rs index 519c488832eb8..1f2ba1d5bc038 100644 --- a/src/stream/src/executor/locality_provider.rs +++ b/src/stream/src/executor/locality_provider.rs @@ -34,7 +34,7 @@ use risingwave_storage::store::PrefetchOptions; use crate::common::table::state_table::StateTable; use crate::executor::backfill::utils::create_builder; use crate::executor::prelude::*; -use crate::task::{CreateMviewProgressReporter, FragmentId}; +use crate::task::{CreateMviewProgressReporter, FragmentId, LocalBarrierManager}; type Builders = HashMap; @@ -189,6 +189,9 @@ pub struct LocalityProviderExecutor { /// Chunk size for output chunk_size: usize, + + /// Local barrier manager for reporting events + barrier_manager: LocalBarrierManager, } impl LocalityProviderExecutor { @@ -203,6 +206,7 @@ impl LocalityProviderExecutor { metrics: Arc, chunk_size: usize, fragment_id: FragmentId, + barrier_manager: LocalBarrierManager, ) -> Self { Self { upstream, @@ -215,6 +219,7 @@ impl LocalityProviderExecutor { metrics, chunk_size, fragment_id, + barrier_manager, } } @@ -490,12 +495,12 @@ impl LocalityProviderExecutor { // Check for StartFragmentBackfill mutation if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; - if let Mutation::StartFragmentBackfill { fragment_ids } = mutation { - tracing::info!( - "Start backfill of locality provider with fragment id: {:?}", - &self.fragment_id - ); + if let Mutation::StartFragmentBackfill { fragment_ids, .. } = mutation { if fragment_ids.contains(&self.fragment_id) { + tracing::info!( + "Start backfill of locality provider with fragment id: {:?}", + &self.fragment_id + ); start_backfill = true; } } @@ -789,8 +794,6 @@ impl LocalityProviderExecutor { } } - // TODO: truncate the state table after backfill. - // After backfill completion, forward messages directly #[for_await] for msg in upstream { @@ -798,6 +801,36 @@ impl LocalityProviderExecutor { match msg { Message::Barrier(barrier) => { + // Truncate state table. + if let Some(mutation) = barrier.mutation.as_deref() { + use crate::executor::Mutation; + if let Mutation::StartFragmentBackfill { + fragment_ids, + truncate_locality_fragment_ids, + .. + } = mutation + { + tracing::info!( + "self fragment_id: {} receive fragment_ids: {:?} truncate_locality_fragment_ids: {:?}", + self.fragment_id, + fragment_ids, + truncate_locality_fragment_ids + ); + if truncate_locality_fragment_ids.contains(&self.fragment_id) { + tracing::info!( + "Truncate state table of locality provider with fragment id: {:?}", + &self.fragment_id + ); + // Report the state table for truncation when receiving the mutation. + self.barrier_manager.report_truncate_state( + barrier.epoch, + self.actor_id, + state_table.table_id(), + ); + } + } + } + // Commit state tables but don't modify them state_table .commit_assert_no_update_vnode_bitmap(barrier.epoch) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ad519820c7114..212d23e454e5a 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -376,6 +376,7 @@ pub enum Mutation { }, StartFragmentBackfill { fragment_ids: HashSet, + truncate_locality_fragment_ids: HashSet, }, RefreshStart { table_id: TableId, @@ -546,7 +547,8 @@ impl Barrier { } pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool { - if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() { + if let Some(Mutation::StartFragmentBackfill { fragment_ids, .. }) = self.mutation.as_deref() + { fragment_ids.contains(&fragment_id) } else { false @@ -940,11 +942,16 @@ impl Mutation { .collect(), }) } - Mutation::StartFragmentBackfill { fragment_ids } => { - PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation { - fragment_ids: fragment_ids.iter().copied().collect(), - }) - } + Mutation::StartFragmentBackfill { + fragment_ids, + truncate_locality_fragment_ids, + } => PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation { + fragment_ids: fragment_ids.iter().copied().collect(), + truncate_locality_fragment_ids: truncate_locality_fragment_ids + .iter() + .copied() + .collect(), + }), Mutation::RefreshStart { table_id, associated_source_id, @@ -1130,6 +1137,11 @@ impl Mutation { .iter() .copied() .collect(), + truncate_locality_fragment_ids: start_fragment_backfill + .truncate_locality_fragment_ids + .iter() + .copied() + .collect(), } } PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart { diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index bcf54744b4b41..4b070295b6bb8 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -545,7 +545,9 @@ impl SourceBackfillExecutorInner { resume_reader!(); } } - Mutation::StartFragmentBackfill { fragment_ids } => { + Mutation::StartFragmentBackfill { + fragment_ids, .. + } => { if fragment_ids.contains(&self.actor_ctx.fragment_id) && pause_control.backfill_resume() { diff --git a/src/stream/src/from_proto/locality_provider.rs b/src/stream/src/from_proto/locality_provider.rs index 7472705f8d34e..3c9bba5c13762 100644 --- a/src/stream/src/from_proto/locality_provider.rs +++ b/src/stream/src/from_proto/locality_provider.rs @@ -77,6 +77,7 @@ impl ExecutorBuilder for LocalityProviderBuilder { params.executor_stats.clone(), params.env.config().developer.chunk_size, params.actor_context.fragment_id, + params.local_barrier_manager.clone(), ); Ok((params.info, exec).into()) diff --git a/src/stream/src/task/barrier_manager/mod.rs b/src/stream/src/task/barrier_manager/mod.rs index 2955dd65c93cd..dbe3c823f436d 100644 --- a/src/stream/src/task/barrier_manager/mod.rs +++ b/src/stream/src/task/barrier_manager/mod.rs @@ -67,6 +67,11 @@ pub(super) enum LocalBarrierEvent { epoch: EpochPair, state: CdcTableBackfillState, }, + ReportTruncateState { + epoch: EpochPair, + actor_id: ActorId, + table_id: u32, + }, } /// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`] @@ -192,6 +197,14 @@ impl LocalBarrierManager { staging_table_id, }); } + + pub fn report_truncate_state(&self, epoch: EpochPair, actor_id: ActorId, table_id: u32) { + self.send_event(LocalBarrierEvent::ReportTruncateState { + epoch, + actor_id, + table_id, + }); + } } #[cfg(test)] diff --git a/src/stream/src/task/barrier_worker/managed_state.rs b/src/stream/src/task/barrier_worker/managed_state.rs index 21fa071ec730e..346be002ed0db 100644 --- a/src/stream/src/task/barrier_worker/managed_state.rs +++ b/src/stream/src/task/barrier_worker/managed_state.rs @@ -1014,6 +1014,13 @@ impl DatabaseManagedBarrierState { } => { self.update_cdc_table_backfill_progress(epoch, actor_id, state); } + LocalBarrierEvent::ReportTruncateState { + epoch, + actor_id, + table_id, + } => { + self.report_truncate_state(epoch, actor_id, table_id); + } } } @@ -1168,6 +1175,47 @@ impl DatabaseManagedBarrierState { .or_default() .insert(staging_table_id); } + + /// Report that a state table should be truncated for a specific epoch + /// This is used by locality provider to truncate its state table after backfill completion + pub(super) fn report_truncate_state( + &mut self, + epoch: EpochPair, + actor_id: ActorId, + table_id: u32, + ) { + // Find the correct partial graph state by matching the actor's partial graph id + let Some(actor_state) = self.actor_states.get(&actor_id) else { + warn!( + ?epoch, + actor_id, table_id, "ignore truncate state table: actor_state not found" + ); + return; + }; + let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) else { + let inflight_barriers = actor_state.inflight_barriers.keys().collect::>(); + warn!( + ?epoch, + actor_id, + table_id, + ?inflight_barriers, + "ignore truncate state table: partial_graph_id not found in inflight_barriers" + ); + return; + }; + let Some(graph_state) = self.graph_states.get_mut(partial_graph_id) else { + warn!( + ?epoch, + actor_id, table_id, "ignore truncate state table: graph_state not found" + ); + return; + }; + graph_state + .truncate_tables + .entry(epoch.curr) + .or_default() + .insert(table_id); + } } impl PartialGraphManagedBarrierState { From 1cc284b989a667aab0787f58810a3c82ad622512 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 Oct 2025 15:00:26 +0800 Subject: [PATCH 2/3] fmt --- src/meta/src/barrier/progress.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 6af683a078022..a7d8e9902210f 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -546,8 +546,12 @@ impl CreateMviewProgressTracker { pub(super) fn take_pending_backfill_nodes(&mut self) -> PendingBackfillFragments { let mut result = PendingBackfillFragments::default(); for fragments in take(&mut self.pending_backfill_fragments) { - result.next_backfill_nodes.extend(fragments.next_backfill_nodes); - result.last_backfill_nodes.extend(fragments.last_backfill_nodes); + result + .next_backfill_nodes + .extend(fragments.next_backfill_nodes); + result + .last_backfill_nodes + .extend(fragments.last_backfill_nodes); } result } From 40a0f8ff596ee90d220f60dc4e81e66cbf39032e Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 10 Oct 2025 18:13:49 +0800 Subject: [PATCH 3/3] fmt --- src/stream/src/executor/locality_provider.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/src/executor/locality_provider.rs b/src/stream/src/executor/locality_provider.rs index 1f2ba1d5bc038..c667711f78a61 100644 --- a/src/stream/src/executor/locality_provider.rs +++ b/src/stream/src/executor/locality_provider.rs @@ -495,14 +495,14 @@ impl LocalityProviderExecutor { // Check for StartFragmentBackfill mutation if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; - if let Mutation::StartFragmentBackfill { fragment_ids, .. } = mutation { - if fragment_ids.contains(&self.fragment_id) { - tracing::info!( - "Start backfill of locality provider with fragment id: {:?}", - &self.fragment_id - ); - start_backfill = true; - } + if let Mutation::StartFragmentBackfill { fragment_ids, .. } = mutation + && fragment_ids.contains(&self.fragment_id) + { + tracing::info!( + "Start backfill of locality provider with fragment id: {:?}", + &self.fragment_id + ); + start_backfill = true; } }