Skip to content

Commit 984dd59

Browse files
committed
refactor
1 parent 2861c60 commit 984dd59

File tree

4 files changed

+165
-95
lines changed

4 files changed

+165
-95
lines changed

src/stream/src/executor/exchange/input.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@
1515
use std::pin::Pin;
1616
use std::task::{Context, Poll};
1717

18-
use anyhow::anyhow;
1918
use either::Either;
2019
use local_input::LocalInputStreamInner;
2120
use pin_project::pin_project;
2221
use risingwave_common::util::addr::{HostAddr, is_local_address};
23-
use tokio::sync::mpsc;
2422

2523
use super::permit::Receiver;
2624
use crate::executor::prelude::*;
@@ -89,25 +87,6 @@ pub(crate) fn apply_dispatcher_barrier(
8987
.extend(dispatcher_barrier.passed_actors);
9088
}
9189

92-
pub(crate) async fn process_dispatcher_msg(
93-
dispatcher_msg: DispatcherMessage,
94-
barrier_rx: &mut mpsc::UnboundedReceiver<Barrier>,
95-
) -> StreamExecutorResult<Message> {
96-
let msg = match dispatcher_msg {
97-
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
98-
DispatcherMessage::Barrier(barrier) => {
99-
let mut recv_barrier = barrier_rx
100-
.recv()
101-
.await
102-
.ok_or_else(|| anyhow!("end of barrier recv"))?;
103-
apply_dispatcher_barrier(&mut recv_barrier, barrier);
104-
Message::Barrier(recv_barrier)
105-
}
106-
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
107-
};
108-
Ok(msg)
109-
}
110-
11190
impl LocalInput {
11291
pub fn new(channel: Receiver, upstream_actor_id: ActorId) -> Self {
11392
Self {

src/stream/src/executor/merge.rs

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@ use std::collections::VecDeque;
1616
use std::pin::Pin;
1717
use std::task::{Context, Poll};
1818

19-
use anyhow::Context as _;
20-
use futures::future::try_join_all;
2119
use risingwave_common::array::StreamChunkBuilder;
2220
use risingwave_common::config::MetricLevel;
2321
use tokio::sync::mpsc;
2422
use tokio::time::Instant;
2523

2624
use super::exchange::input::BoxedActorInput;
2725
use super::*;
28-
use crate::executor::exchange::input::{
29-
assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
30-
};
26+
use crate::executor::exchange::input::{apply_dispatcher_barrier, assert_equal_dispatcher_barrier};
3127
use crate::executor::prelude::*;
3228
use crate::task::LocalBarrierManager;
3329

@@ -242,31 +238,43 @@ impl MergeExecutor {
242238
// Channels that're blocked by the barrier to align.
243239
let mut start_time = Instant::now();
244240
pin_mut!(select_all);
245-
while let Some(msg) = select_all.next().await {
241+
242+
let mut barrier_buffer = DispatchBarrierBuffer {
243+
buffer: VecDeque::new(),
244+
curr_fragment_id: self.upstream_fragment_id,
245+
barrier_rx: self.barrier_rx,
246+
actor_id,
247+
local_barrier_manager: self.local_barrier_manager,
248+
metrics: self.metrics.clone(),
249+
fragment_id: self.fragment_id,
250+
};
251+
252+
loop {
253+
let msg = barrier_buffer.select_once(&mut select_all).await?;
246254
metrics
247255
.actor_input_buffer_blocking_duration_ns
248256
.inc_by(start_time.elapsed().as_nanos() as u64);
249-
let msg: DispatcherMessage = msg?;
250-
let mut msg: Message = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
251257

252-
match &mut msg {
253-
Message::Watermark(_) => {
254-
// Do nothing.
255-
}
256-
Message::Chunk(chunk) => {
258+
let msg = match msg {
259+
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
260+
DispatcherMessage::Chunk(chunk) => {
257261
metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
262+
Message::Chunk(chunk)
258263
}
259-
Message::Barrier(barrier) => {
264+
DispatcherMessage::Barrier(barrier) => {
260265
tracing::debug!(
261266
target: "events::stream::barrier::path",
262267
actor_id = actor_id,
263268
"receiver receives barrier from path: {:?}",
264269
barrier.passed_actors
265270
);
266-
barrier.passed_actors.push(actor_id);
271+
let (mut recv_barrier, new_inputs) =
272+
barrier_buffer.pop_barrier_with_inputs().await?;
273+
apply_dispatcher_barrier(&mut recv_barrier, barrier);
274+
recv_barrier.passed_actors.push(actor_id);
267275

268276
if let Some(Mutation::Update(UpdateMutation { dispatchers, .. })) =
269-
barrier.mutation.as_deref()
277+
recv_barrier.mutation.as_deref()
270278
&& select_all
271279
.upstream_input_ids()
272280
.any(|actor_id| dispatchers.contains_key(&actor_id))
@@ -275,8 +283,8 @@ impl MergeExecutor {
275283
select_all.flush_buffered_watermarks();
276284
}
277285

278-
if let Some(update) =
279-
barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
286+
if let Some(update) = recv_barrier
287+
.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
280288
{
281289
let new_upstream_fragment_id = update
282290
.new_upstream_fragment_id
@@ -291,28 +299,12 @@ impl MergeExecutor {
291299
// `Watermark` of upstream may become stale after upstream scaling.
292300
select_all.flush_buffered_watermarks();
293301

294-
if !update.added_upstream_actors.is_empty() {
295-
// Create new upstreams receivers.
296-
let mut new_upstreams: Vec<_> = try_join_all(
297-
update.added_upstream_actors.iter().map(|upstream_actor| {
298-
new_input(
299-
&self.local_barrier_manager,
300-
self.metrics.clone(),
301-
self.actor_context.id,
302-
self.fragment_id,
303-
upstream_actor,
304-
new_upstream_fragment_id,
305-
)
306-
}),
307-
)
308-
.await
309-
.context("failed to create upstream receivers")?;
310-
302+
if let Some(mut new_upstreams) = new_inputs {
311303
// Poll the first barrier from the new upstreams. It must be the same as
312304
// the one we polled from original upstreams.
313305
for upstream in &mut new_upstreams {
314306
let new_barrier = expect_first_barrier(upstream).await?;
315-
assert_equal_dispatcher_barrier(barrier, &new_barrier);
307+
assert_equal_dispatcher_barrier(&recv_barrier, &new_barrier);
316308
}
317309

318310
// Add the new upstreams to select.
@@ -332,12 +324,16 @@ impl MergeExecutor {
332324
);
333325
}
334326

335-
if barrier.is_stop(actor_id) {
327+
let is_stop = recv_barrier.is_stop(actor_id);
328+
let msg = Message::Barrier(recv_barrier);
329+
if is_stop {
336330
yield msg;
337331
break;
338332
}
333+
334+
msg
339335
}
340-
}
336+
};
341337

342338
yield msg;
343339
start_time = Instant::now();

src/stream/src/executor/mod.rs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414

1515
mod prelude;
1616

17-
use std::collections::{BTreeMap, HashMap, HashSet};
17+
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
1818
use std::fmt::Debug;
19+
use std::future::pending;
1920
use std::hash::Hash;
2021
use std::pin::Pin;
2122
use std::sync::Arc;
@@ -24,6 +25,7 @@ use std::vec;
2425

2526
use await_tree::InstrumentAwait;
2627
use enum_as_inner::EnumAsInner;
28+
use futures::future::try_join_all;
2729
use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
2830
use futures::{Stream, StreamExt};
2931
use itertools::Itertools;
@@ -56,12 +58,14 @@ use risingwave_pb::stream_plan::{
5658
SubscriptionUpstreamInfo, ThrottleMutation,
5759
};
5860
use smallvec::SmallVec;
61+
use tokio::sync::mpsc;
5962
use tokio::time::Instant;
6063

6164
use crate::error::StreamResult;
62-
use crate::executor::exchange::input::BoxedInput;
65+
use crate::executor::exchange::input::{BoxedActorInput, BoxedInput, new_input};
66+
use crate::executor::prelude::StreamingMetrics;
6367
use crate::executor::watermark::BufferedWatermarks;
64-
use crate::task::{ActorId, FragmentId};
68+
use crate::task::{ActorId, FragmentId, LocalBarrierManager};
6569

6670
mod actor;
6771
mod barrier_align;
@@ -1713,3 +1717,87 @@ impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId,
17131717
self.blocked.is_empty() && self.active.is_empty()
17141718
}
17151719
}
1720+
1721+
pub(crate) struct DispatchBarrierBuffer {
1722+
pub buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1723+
pub curr_fragment_id: FragmentId,
1724+
pub barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1725+
pub actor_id: ActorId,
1726+
// cloned/moved from MergeExecutor, just used to create new inputs
1727+
pub local_barrier_manager: LocalBarrierManager,
1728+
pub metrics: Arc<StreamingMetrics>,
1729+
pub fragment_id: FragmentId,
1730+
}
1731+
1732+
impl DispatchBarrierBuffer {
1733+
pub async fn select_once(
1734+
&mut self,
1735+
stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1736+
) -> StreamExecutorResult<DispatcherMessage> {
1737+
tokio::select! {
1738+
biased;
1739+
1740+
msg = stream.next() => {
1741+
msg.ok_or(StreamExecutorError::channel_closed(
1742+
"upstream executor closed unexpectedly",
1743+
))?
1744+
}
1745+
1746+
Err(e) = self.concurrently_fetch_barrier_rx() => {
1747+
Err(e)
1748+
}
1749+
}
1750+
}
1751+
1752+
pub async fn pop_barrier_with_inputs(
1753+
&mut self,
1754+
) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1755+
if !self.buffer.is_empty() {
1756+
let barrier = self.buffer.pop_front().unwrap();
1757+
Ok(barrier)
1758+
} else {
1759+
let barrier = self
1760+
.barrier_rx
1761+
.recv()
1762+
.await
1763+
.context("barrier channel closed")?;
1764+
self.pre_apply_barrier(barrier).await
1765+
}
1766+
}
1767+
1768+
async fn concurrently_fetch_barrier_rx(&mut self) -> StreamExecutorResult<!> {
1769+
while let Some(barrier) = self.barrier_rx.recv().await {
1770+
let barrier_with_inputs = self.pre_apply_barrier(barrier).await?;
1771+
self.buffer.push_back(barrier_with_inputs);
1772+
}
1773+
pending::<!>().await
1774+
}
1775+
1776+
async fn pre_apply_barrier(
1777+
&mut self,
1778+
barrier: Barrier,
1779+
) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1780+
let new_actor_inputs = if let Some(update) =
1781+
barrier.as_update_merge(self.actor_id, self.curr_fragment_id)
1782+
&& !update.added_upstream_actors.is_empty()
1783+
{
1784+
let new_inputs =
1785+
try_join_all(update.added_upstream_actors.iter().map(|upstream_actor| {
1786+
new_input(
1787+
&self.local_barrier_manager,
1788+
self.metrics.clone(),
1789+
self.actor_id,
1790+
self.fragment_id,
1791+
upstream_actor,
1792+
self.curr_fragment_id,
1793+
)
1794+
}))
1795+
.await
1796+
.context("failed to create upstream receivers")?;
1797+
Some(new_inputs)
1798+
} else {
1799+
None
1800+
};
1801+
Ok((barrier, new_actor_inputs))
1802+
}
1803+
}

0 commit comments

Comments
 (0)