Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ where
self.metrics.group_top_n_cache_miss_count.inc();
let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
self.managed_state
.init_topn_cache(Some(group_key), &mut topn_cache)
.init_append_only_topn_cache(Some(group_key), &mut topn_cache)
.await?;
self.caches.put(group_cache_key.clone(), topn_cache);
}
Expand Down
78 changes: 50 additions & 28 deletions src/stream/src/executor/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ impl<S: StateStore> ManagedTopNState<S> {
Ok(())
}

pub async fn init_topn_cache<const WITH_TIES: bool>(
pub async fn init_topn_cache_inner<const WITH_TIES: bool>(
&self,
group_key: Option<impl GroupKey>,
topn_cache: &mut TopNCache<WITH_TIES>,
skip_high: bool,
) -> StreamExecutorResult<()> {
assert!(topn_cache.low.as_ref().map(Cache::is_empty).unwrap_or(true));
assert!(topn_cache.middle.is_empty());
Expand Down Expand Up @@ -263,43 +264,64 @@ impl<S: StateStore> ManagedTopNState<S> {
}
}

assert!(
topn_cache.high_cache_capacity > 0,
"topn cache high_capacity should always > 0"
);
while !topn_cache.high_is_full()
&& let Some(item) = state_table_iter.next().await
{
group_row_count += 1;
let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len());
topn_cache
.high
.insert(topn_row.cache_key, (&topn_row.row).into());
}
if WITH_TIES && topn_cache.high_is_full() {
let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0.0.clone();
while let Some(item) = state_table_iter.next().await {
if !skip_high {
assert!(
topn_cache.high_cache_capacity > 0,
"topn cache high_capacity should always > 0"
);
while !topn_cache.high_is_full()
&& let Some(item) = state_table_iter.next().await
{
group_row_count += 1;
let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len());
if topn_row.cache_key.0 == high_last_sort_key {
topn_cache
.high
.insert(topn_row.cache_key, (&topn_row.row).into());
} else {
break;
topn_cache
.high
.insert(topn_row.cache_key, (&topn_row.row).into());
}
if WITH_TIES && topn_cache.high_is_full() {
let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0.0.clone();
while let Some(item) = state_table_iter.next().await {
group_row_count += 1;
let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len());
if topn_row.cache_key.0 == high_last_sort_key {
topn_cache
.high
.insert(topn_row.cache_key, (&topn_row.row).into());
} else {
break;
}
}
}
}

if state_table_iter.next().await.is_none() {
// After trying to initially fill in the cache, all table entries are in the cache,
// we then get the precise table row count.
if state_table_iter.next().await.is_none() {
// After trying to initially fill in the cache, all table entries are in the cache,
// we then get the precise table row count.
topn_cache.update_table_row_count(group_row_count);
}
} else {
topn_cache.update_table_row_count(group_row_count);
}

Ok(())
}

pub async fn init_topn_cache<const WITH_TIES: bool>(
&self,
group_key: Option<impl GroupKey>,
topn_cache: &mut TopNCache<WITH_TIES>,
) -> StreamExecutorResult<()> {
self.init_topn_cache_inner(group_key, topn_cache, false)
.await
}

pub async fn init_append_only_topn_cache<const WITH_TIES: bool>(
&self,
group_key: Option<impl GroupKey>,
topn_cache: &mut TopNCache<WITH_TIES>,
) -> StreamExecutorResult<()> {
self.init_topn_cache_inner(group_key, topn_cache, true)
.await
}

pub async fn flush(
&mut self,
epoch: EpochPair,
Expand Down