Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ message BarrierCompleteResponse {
// MV backfill snapshot read rows / Source backfilled rows
uint64 consumed_rows = 4;
uint64 pending_epoch_lag = 5;
// Buffered rows that are yet to be consumed (used by locality backfill to report precise progress)
uint64 buffered_rows = 6;
}
message CdcTableBackfillProgress {
uint32 actor_id = 1;
Expand Down
77 changes: 50 additions & 27 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments
use crate::stream::{SourceChange, SourceManagerRef};

type ConsumedRows = u64;
type BufferedRows = u64;

#[derive(Clone, Copy, Debug)]
enum BackfillState {
Init,
ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
Done(ConsumedRows),
ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
Done(ConsumedRows, BufferedRows),
}

/// Represents the backfill nodes that need to be scheduled or cleaned up.
Expand Down Expand Up @@ -70,6 +71,9 @@ pub(super) struct Progress {
upstream_mvs_total_key_count: u64,
mv_backfill_consumed_rows: u64,
source_backfill_consumed_rows: u64,
/// Buffered rows (for locality backfill) that are yet to be consumed
/// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
mv_backfill_buffered_rows: u64,

/// DDL definition
definition: String,
Expand Down Expand Up @@ -103,6 +107,7 @@ impl Progress {
upstream_mvs_total_key_count: upstream_total_key_count,
mv_backfill_consumed_rows: 0,
source_backfill_consumed_rows: 0,
mv_backfill_buffered_rows: 0,
definition,
create_type,
backfill_order_state,
Expand All @@ -121,25 +126,29 @@ impl Progress {
self.upstream_mvs_total_key_count = upstream_total_key_count;
let total_actors = self.states.len();
let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
tracing::debug!(?actor, states = ?self.states, "update progress for actor");

let mut old = 0;
let mut new = 0;
let mut old_crow = 0;
let mut new_crow = 0;
let mut old_brow = 0;
let mut new_brow = 0;
match self.states.remove(&actor).unwrap() {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
old = old_consumed_rows;
BackfillState::ConsumingUpstream(_, old_consumed_rows, old_buffered_rows) => {
old_crow = old_consumed_rows;
old_brow = old_buffered_rows;
}
BackfillState::Done(_) => panic!("should not report done multiple times"),
BackfillState::Done(_, _) => panic!("should not report done multiple times"),
};
match &new_state {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
new = *new_consumed_rows;
BackfillState::ConsumingUpstream(_, new_consumed_rows, new_buffered_rows) => {
new_crow = *new_consumed_rows;
new_brow = *new_buffered_rows;
}
BackfillState::Done(new_consumed_rows) => {
BackfillState::Done(new_consumed_rows, new_buffered_rows) => {
tracing::debug!("actor {} done", actor);
new = *new_consumed_rows;
new_crow = *new_consumed_rows;
new_brow = *new_buffered_rows;
self.done_count += 1;
let before_backfill_nodes = self
.backfill_order_state
Expand Down Expand Up @@ -168,20 +177,25 @@ impl Progress {
);
}
};
debug_assert!(new >= old, "backfill progress should not go backward");
debug_assert!(
new_crow >= old_crow,
"backfill progress should not go backward"
);
match backfill_upstream_type {
BackfillUpstreamType::MView => {
self.mv_backfill_consumed_rows += new - old;
self.mv_backfill_consumed_rows += new_crow - old_crow;
}
BackfillUpstreamType::Source => {
self.source_backfill_consumed_rows += new - old;
self.source_backfill_consumed_rows += new_crow - old_crow;
}
BackfillUpstreamType::Values => {
// do not consider progress for values
}
BackfillUpstreamType::LocalityProvider => {
// Track LocalityProvider progress similar to MView
self.mv_backfill_consumed_rows += new - old;
// Update buffered rows for precise progress calculation
self.mv_backfill_consumed_rows += new_crow - old_crow;
self.mv_backfill_buffered_rows += new_brow - old_brow;
}
}
self.states.insert(actor, new_state);
Expand Down Expand Up @@ -222,19 +236,23 @@ impl Progress {
}

let mv_progress = (mv_count > 0).then_some({
if self.upstream_mvs_total_key_count == 0 {
// Include buffered rows in total for precise progress calculation
// Progress = consumed / (upstream_total + buffered)
let total_rows_to_consume =
self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
if total_rows_to_consume == 0 {
"99.99%".to_owned()
} else {
let mut progress = self.mv_backfill_consumed_rows as f64
/ (self.upstream_mvs_total_key_count as f64);
let mut progress =
self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
if progress > 1.0 {
progress = 0.9999;
}
format!(
"{:.2}% ({}/{})",
progress * 100.0,
self.mv_backfill_consumed_rows,
self.upstream_mvs_total_key_count
total_rows_to_consume
)
}
});
Expand Down Expand Up @@ -385,7 +403,7 @@ impl CreateMviewProgressTracker {
let actors = table_fragments.tracking_progress_actor_ids();
for (actor, backfill_upstream_type) in actors {
actor_map.insert(actor, creating_table_id);
states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
backfill_upstream_types.insert(actor, backfill_upstream_type);
}
let source_backfill_fragments = table_fragments.source_backfill_fragments();
Expand Down Expand Up @@ -441,6 +459,7 @@ impl CreateMviewProgressTracker {
upstream_mvs_total_key_count,
mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
source_backfill_consumed_rows: 0, // Fill only after first barrier pass
mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
definition,
create_type: CreateType::Background,
}
Expand Down Expand Up @@ -687,22 +706,26 @@ impl CreateMviewProgressTracker {
};

let new_state = if progress.done {
BackfillState::Done(progress.consumed_rows)
BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
} else {
BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
BackfillState::ConsumingUpstream(
progress.consumed_epoch.into(),
progress.consumed_rows,
progress.buffered_rows,
)
};

match self.progress_map.entry(table_id) {
Entry::Occupied(mut o) => {
let progress = &mut o.get_mut().0;
let progress_state = &mut o.get_mut().0;

let upstream_total_key_count: u64 =
calculate_total_key_count(&progress.upstream_mv_count, version_stats);
calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);

tracing::debug!(?table_id, "updating progress for table");
let pending = progress.update(actor, new_state, upstream_total_key_count);
let pending = progress_state.update(actor, new_state, upstream_total_key_count);

if progress.is_done() {
if progress_state.is_done() {
tracing::debug!(
"all actors done for creating mview with table_id {}!",
table_id
Expand Down
24 changes: 19 additions & 5 deletions src/stream/src/executor/locality_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
.per_vnode
.values()
.all(|progress| matches!(progress, LocalityBackfillProgress::NotStarted));

let mut buffered_rows: u64 = 0;
// Initial buffering phase before backfill - wait for StartFragmentBackfill mutation (if needed)
if need_buffering {
// Enter buffering phase - buffer data until StartFragmentBackfill is received
Expand All @@ -481,6 +481,7 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
// Ignore watermarks during initial buffering
}
Message::Chunk(chunk) => {
buffered_rows += chunk.cardinality() as u64;
state_table.write_chunk(chunk);
state_table.try_flush().await?;
}
Expand All @@ -501,6 +502,13 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
}
}

self.progress.update_with_buffered_rows(
barrier.epoch,
barrier.epoch.curr,
0,
buffered_rows,
);

// Commit state tables
let post_commit1 = state_table.commit(epoch).await?;
let post_commit2 = progress_table.commit(epoch).await?;
Expand Down Expand Up @@ -700,6 +708,7 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
.await?;

// Update progress with current epoch and snapshot read count
// Report both consumed rows and buffered rows separately for precise progress
let total_snapshot_processed_rows: u64 = backfill_state
.vnodes()
.map(|(_, progress)| match *progress {
Expand All @@ -711,10 +720,11 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
})
.sum();

self.progress.update(
self.progress.update_with_buffered_rows(
barrier.epoch,
barrier.epoch.curr, // Use barrier epoch as snapshot read epoch
total_snapshot_processed_rows,
buffered_rows,
);

// Persist backfill progress
Expand Down Expand Up @@ -765,9 +775,13 @@ impl<S: StateStore> LocalityProviderExecutor<S> {
})
.sum();

// Finish progress reporting
self.progress
.finish(barrier.epoch, total_snapshot_processed_rows);
// Finish progress reporting with any remaining buffered rows
// At completion, buffered_rows is 0 since all rows have been consumed
self.progress.finish_with_buffered_rows(
barrier.epoch,
total_snapshot_processed_rows,
buffered_rows,
);

// Persist final state
Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
Expand Down
Loading
Loading