Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ message ConnectorPropsChangeMutation {

message StartFragmentBackfillMutation {
repeated uint32 fragment_ids = 1;
repeated uint32 truncate_locality_fragment_ids = 2;
}

message RefreshStartMutation {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/barrier/backfill_order_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,8 @@ impl BackfillOrderState {
}
newly_scheduled
}

pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
self.current_backfill_nodes.keys().copied().collect()
}
}
15 changes: 11 additions & 4 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,11 +1137,18 @@ impl DatabaseCheckpointControl {
if let Some(jobs_to_merge) = self.jobs_to_merge() {
command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
} else {
let pending_backfill_nodes =
self.create_mview_tracker.take_pending_backfill_nodes();
if !pending_backfill_nodes.is_empty() {
let fragments = self.create_mview_tracker.take_pending_backfill_nodes();
if !fragments.next_backfill_nodes.is_empty()
|| !fragments.last_backfill_nodes.is_empty()
{
tracing::info!(
"start fragment backfill for mview, next_backfill_nodes: {:?}, last_backfill_nodes: {:?}",
fragments.next_backfill_nodes,
fragments.last_backfill_nodes
);
command = Some(Command::StartFragmentBackfill {
fragment_ids: pending_backfill_nodes,
fragment_ids: fragments.next_backfill_nodes,
truncate_locality_fragment_ids: fragments.last_backfill_nodes,
});
}
}
Expand Down
19 changes: 16 additions & 3 deletions src/meta/src/barrier/checkpoint/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,26 @@ impl CreatingStreamingJobStatus {
..
} => {
let mutation = {
let pending_backfill_nodes = create_mview_tracker.take_pending_backfill_nodes();
if pending_backfill_nodes.is_empty() {
let fragments = create_mview_tracker.take_pending_backfill_nodes();
if fragments.next_backfill_nodes.is_empty()
&& fragments.last_backfill_nodes.is_empty()
{
None
} else {
tracing::info!(
"Injecting backfill mutation for new upstream epoch. next_backfill_nodes: {:?}, last_backfill_nodes: {:?}",
fragments.next_backfill_nodes,
fragments.last_backfill_nodes
);
Some(Mutation::StartFragmentBackfill(
StartFragmentBackfillMutation {
fragment_ids: pending_backfill_nodes
fragment_ids: fragments
.next_backfill_nodes
.into_iter()
.map(|fragment_id| fragment_id as _)
.collect(),
truncate_locality_fragment_ids: fragments
.last_backfill_nodes
.into_iter()
.map(|fragment_id| fragment_id as _)
.collect(),
Expand Down
14 changes: 10 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,10 @@ pub enum Command {
ConnectorPropsChange(ConnectorPropsChange),

/// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
/// The `truncate_locality_fragment_ids` are the locality backfill fragments that need to truncate their state.
StartFragmentBackfill {
fragment_ids: Vec<FragmentId>,
truncate_locality_fragment_ids: Vec<FragmentId>,
},

/// `Refresh` command generates a barrier to refresh a table by truncating state
Expand Down Expand Up @@ -1316,11 +1318,15 @@ impl Command {
},
))
}
Command::StartFragmentBackfill { fragment_ids } => Some(
Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
Command::StartFragmentBackfill {
fragment_ids,
truncate_locality_fragment_ids,
} => Some(Mutation::StartFragmentBackfill(
StartFragmentBackfillMutation {
fragment_ids: fragment_ids.clone(),
}),
),
truncate_locality_fragment_ids: truncate_locality_fragment_ids.clone(),
},
)),
Command::Refresh {
table_id,
associated_source_id,
Expand Down
89 changes: 65 additions & 24 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ enum BackfillState {
Done(ConsumedRows),
}

/// Represents the backfill nodes that need to be scheduled or cleaned up.
#[derive(Debug, Default)]
pub(super) struct PendingBackfillFragments {
/// Fragment IDs that should start backfilling in the next checkpoint
pub next_backfill_nodes: Vec<FragmentId>,
/// Fragment IDs whose locality backfill state should be truncated
/// (they have just completed backfilling)
pub last_backfill_nodes: Vec<FragmentId>,
}

/// Progress of all actors containing backfill executors while creating mview.
#[derive(Debug)]
pub(super) struct Progress {
Expand Down Expand Up @@ -100,13 +110,14 @@ impl Progress {
}

/// Update the progress of `actor`.
/// Returns the backfill fragments that need to be scheduled or cleaned up.
fn update(
&mut self,
actor: ActorId,
new_state: BackfillState,
upstream_total_key_count: u64,
) -> Vec<FragmentId> {
let mut next_backfill_nodes = vec![];
) -> PendingBackfillFragments {
let mut result = PendingBackfillFragments::default();
self.upstream_mvs_total_key_count = upstream_total_key_count;
let total_actors = self.states.len();
let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
Expand All @@ -130,7 +141,18 @@ impl Progress {
tracing::debug!("actor {} done", actor);
new = *new_consumed_rows;
self.done_count += 1;
next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
let before_backfill_nodes = self
.backfill_order_state
.current_backfill_node_fragment_ids();
result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
let after_backfill_nodes = self
.backfill_order_state
.current_backfill_node_fragment_ids();
// last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
result.last_backfill_nodes = before_backfill_nodes
.into_iter()
.filter(|x| !after_backfill_nodes.contains(x))
.collect();
tracing::debug!(
"{} actors out of {} complete",
self.done_count,
Expand All @@ -155,7 +177,7 @@ impl Progress {
}
}
self.states.insert(actor, new_state);
next_backfill_nodes
result
}

/// Returns whether all backfill executors are done.
Expand Down Expand Up @@ -297,8 +319,10 @@ pub(super) struct TrackingCommand {

pub(super) enum UpdateProgressResult {
None,
Finished(TrackingJob),
BackfillNodeFinished(Vec<FragmentId>),
/// The finished job, along with its pending backfill fragments for cleanup.
Finished(TrackingJob, PendingBackfillFragments),
/// Backfill nodes have finished and new ones need to be scheduled.
BackfillNodeFinished(PendingBackfillFragments),
}

/// Tracking is done as follows:
Expand All @@ -316,8 +340,8 @@ pub(super) struct CreateMviewProgressTracker {
/// Stash of finished jobs. They will be finally finished on checkpoint.
pending_finished_jobs: Vec<TrackingJob>,

/// Stash of pending backfill nodes. They will start backfilling on checkpoint.
pending_backfill_nodes: Vec<FragmentId>,
/// Stash of pending backfill fragments. They will start backfilling or be cleaned up on checkpoint.
pending_backfill_fragments: Vec<PendingBackfillFragments>,
}

impl CreateMviewProgressTracker {
Expand Down Expand Up @@ -361,7 +385,7 @@ impl CreateMviewProgressTracker {
progress_map,
actor_map,
pending_finished_jobs: Vec::new(),
pending_backfill_nodes: Vec::new(),
pending_backfill_fragments: Vec::new(),
}
}

Expand Down Expand Up @@ -435,17 +459,18 @@ impl CreateMviewProgressTracker {
UpdateProgressResult::None => {
tracing::trace!(?progress, "update progress");
}
UpdateProgressResult::Finished(command) => {
UpdateProgressResult::Finished(command, fragments) => {
self.queue_backfill(fragments);
tracing::trace!(?progress, "finish progress");
commands.push(command);
}
UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
UpdateProgressResult::BackfillNodeFinished(fragments) => {
tracing::trace!(
?progress,
?next_backfill_nodes,
next_backfill_nodes = ?fragments.next_backfill_nodes,
"start next backfill node"
);
self.queue_backfill(next_backfill_nodes);
self.queue_backfill(fragments);
}
}
}
Expand Down Expand Up @@ -506,8 +531,8 @@ impl CreateMviewProgressTracker {
self.pending_finished_jobs.push(finished_job);
}

fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
self.pending_backfill_nodes.extend(backfill_nodes);
fn queue_backfill(&mut self, fragments: PendingBackfillFragments) {
self.pending_backfill_fragments.push(fragments);
}

/// Finish stashed jobs on checkpoint.
Expand All @@ -516,8 +541,19 @@ impl CreateMviewProgressTracker {
take(&mut self.pending_finished_jobs)
}

pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
take(&mut self.pending_backfill_nodes)
/// Take the pending backfill fragments to start on checkpoint.
/// Merge all pending backfill fragments into one.
pub(super) fn take_pending_backfill_nodes(&mut self) -> PendingBackfillFragments {
let mut result = PendingBackfillFragments::default();
for fragments in take(&mut self.pending_backfill_fragments) {
result
.next_backfill_nodes
.extend(fragments.next_backfill_nodes);
result
.last_backfill_nodes
.extend(fragments.last_backfill_nodes);
}
result
}

pub(super) fn has_pending_finished_jobs(&self) -> bool {
Expand Down Expand Up @@ -640,8 +676,7 @@ impl CreateMviewProgressTracker {
calculate_total_key_count(&progress.upstream_mv_count, version_stats);

tracing::debug!(?table_id, "updating progress for table");
let next_backfill_nodes =
progress.update(actor, new_state, upstream_total_key_count);
let fragments = progress.update(actor, new_state, upstream_total_key_count);

if progress.is_done() {
tracing::debug!(
Expand All @@ -653,11 +688,17 @@ impl CreateMviewProgressTracker {
for actor in o.get().0.actors() {
self.actor_map.remove(&actor);
}
assert!(next_backfill_nodes.is_empty());
UpdateProgressResult::Finished(o.remove().1)
} else if !next_backfill_nodes.is_empty() {
tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
assert!(fragments.next_backfill_nodes.is_empty());
UpdateProgressResult::Finished(o.remove().1, fragments)
} else if !fragments.next_backfill_nodes.is_empty()
|| !fragments.last_backfill_nodes.is_empty()
{
tracing::debug!(
"scheduling next backfill nodes: {:?} and truncate last backfill nodes: {:?}",
fragments.next_backfill_nodes,
fragments.last_backfill_nodes
);
UpdateProgressResult::BackfillNodeFinished(fragments)
} else {
UpdateProgressResult::None
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ where
Mutation::Resume => {
global_pause = false;
}
Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
Mutation::StartFragmentBackfill { fragment_ids, .. } if backfill_paused => {
if fragment_ids.contains(&self.fragment_id) {
backfill_paused = false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ where
Mutation::Resume => {
global_pause = false;
}
Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
Mutation::StartFragmentBackfill { fragment_ids, .. } if backfill_paused => {
if fragment_ids.contains(&self.fragment_id) {
backfill_paused = false;
}
Expand Down
Loading
Loading