From b53782e11c75fc1704b9f68878c76d8b94fde9fd Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Fri, 19 Sep 2025 22:06:10 +0800 Subject: [PATCH] feat: avoid populating the high part of append-only group topn cache (#23264) --- .../executor/top_n/group_top_n_appendonly.rs | 2 +- src/stream/src/executor/top_n/top_n_state.rs | 78 ++++++++++++------- 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 3bfe12fa68cea..1905c7fcabd53 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -167,7 +167,7 @@ where self.metrics.group_top_n_cache_miss_count.inc(); let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone()); self.managed_state - .init_topn_cache(Some(group_key), &mut topn_cache) + .init_append_only_topn_cache(Some(group_key), &mut topn_cache) .await?; self.caches.put(group_cache_key.clone(), topn_cache); } diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index c823ff3580368..f5baf9cf4aa4b 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -198,10 +198,11 @@ impl ManagedTopNState { Ok(()) } - pub async fn init_topn_cache( + pub async fn init_topn_cache_inner( &self, group_key: Option, topn_cache: &mut TopNCache, + skip_high: bool, ) -> StreamExecutorResult<()> { assert!(topn_cache.low.as_ref().map(Cache::is_empty).unwrap_or(true)); assert!(topn_cache.middle.is_empty()); @@ -263,43 +264,64 @@ impl ManagedTopNState { } } - assert!( - topn_cache.high_cache_capacity > 0, - "topn cache high_capacity should always > 0" - ); - while !topn_cache.high_is_full() - && let Some(item) = state_table_iter.next().await - { - group_row_count += 1; - let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); - topn_cache - .high - .insert(topn_row.cache_key, (&topn_row.row).into()); - } - if WITH_TIES && topn_cache.high_is_full() { - let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0.0.clone(); - while let Some(item) = state_table_iter.next().await { + if !skip_high { + assert!( + topn_cache.high_cache_capacity > 0, + "topn cache high_capacity should always > 0" + ); + while !topn_cache.high_is_full() + && let Some(item) = state_table_iter.next().await + { group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); - if topn_row.cache_key.0 == high_last_sort_key { - topn_cache - .high - .insert(topn_row.cache_key, (&topn_row.row).into()); - } else { - break; + topn_cache + .high + .insert(topn_row.cache_key, (&topn_row.row).into()); + } + if WITH_TIES && topn_cache.high_is_full() { + let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0.0.clone(); + while let Some(item) = state_table_iter.next().await { + group_row_count += 1; + let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); + if topn_row.cache_key.0 == high_last_sort_key { + topn_cache + .high + .insert(topn_row.cache_key, (&topn_row.row).into()); + } else { + break; + } } } - } - - if state_table_iter.next().await.is_none() { - // After trying to initially fill in the cache, all table entries are in the cache, - // we then get the precise table row count. + if state_table_iter.next().await.is_none() { + // After trying to initially fill in the cache, all table entries are in the cache, + // we then get the precise table row count. + topn_cache.update_table_row_count(group_row_count); + } + } else { topn_cache.update_table_row_count(group_row_count); } Ok(()) } + pub async fn init_topn_cache( + &self, + group_key: Option, + topn_cache: &mut TopNCache, + ) -> StreamExecutorResult<()> { + self.init_topn_cache_inner(group_key, topn_cache, false) + .await + } + + pub async fn init_append_only_topn_cache( + &self, + group_key: Option, + topn_cache: &mut TopNCache, + ) -> StreamExecutorResult<()> { + self.init_topn_cache_inner(group_key, topn_cache, true) + .await + } + pub async fn flush( &mut self, epoch: EpochPair,