Skip to content

Commit b3ebcc7

Browse files
Fix open path api (#123)
* fix open path api * simplify code
1 parent 5bed0bd commit b3ebcc7

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

quinn/src/connection.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ impl Connection {
373373
}
374374
}
375375
Ok((path_id, _)) => {
376-
let (tx, rx) = watch::channel(None);
376+
let (tx, rx) = watch::channel(Ok(()));
377377
state.open_path.insert(path_id, tx);
378378
drop(state);
379379
OpenPath::new(path_id, rx, self.0.clone())
@@ -385,7 +385,7 @@ impl Connection {
385385
/// Opens a (Multi)Path
386386
pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
387387
let mut state = self.0.state.lock("open_path");
388-
let (on_open_path_send, on_open_path_recv) = watch::channel(None);
388+
let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
389389
let now = state.runtime.now();
390390
let open_res = state.inner.open_path(addr, initial_status, now);
391391
state.wake();
@@ -1096,7 +1096,7 @@ pub(crate) struct State {
10961096
/// Always set to Some before the connection becomes drained
10971097
pub(crate) error: Option<ConnectionError>,
10981098
/// Tracks paths being opened
1099-
open_path: FxHashMap<PathId, watch::Sender<Option<Result<(), PathError>>>>,
1099+
open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
11001100
/// Tracks paths being closed
11011101
pub(crate) close_path: FxHashMap<PathId, oneshot::Sender<VarInt>>,
11021102
pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
@@ -1266,7 +1266,7 @@ impl State {
12661266
Path(ref evt @ PathEvent::Opened { id }) => {
12671267
self.path_events.send(evt.clone()).ok();
12681268
if let Some(sender) = self.open_path.remove(&id) {
1269-
let _ = sender.send(Some(Ok(())));
1269+
sender.send_modify(|value| *value = Ok(()));
12701270
}
12711271
}
12721272
Path(ref evt @ PathEvent::Closed { id, error_code }) => {
@@ -1281,7 +1281,7 @@ impl State {
12811281
Path(ref evt @ PathEvent::LocallyClosed { id, error }) => {
12821282
self.path_events.send(evt.clone()).ok();
12831283
if let Some(sender) = self.open_path.remove(&id) {
1284-
let _ = sender.send(Some(Err(error)));
1284+
sender.send_modify(|value| *value = Err(error));
12851285
}
12861286
// this will happen also for already opened paths
12871287
}

quinn/src/path.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ enum OpenPathInner {
2222
///
2323
/// This migth fail later on.
2424
Ongoing {
25-
opened: watch::Receiver<Option<Result<(), PathError>>>,
25+
opened: WatchStream<Result<(), PathError>>,
2626
path_id: PathId,
2727
conn: ConnectionRef,
2828
},
@@ -41,11 +41,11 @@ enum OpenPathInner {
4141
impl OpenPath {
4242
pub(crate) fn new(
4343
path_id: PathId,
44-
opened: watch::Receiver<Option<Result<(), PathError>>>,
44+
opened: watch::Receiver<Result<(), PathError>>,
4545
conn: ConnectionRef,
4646
) -> Self {
4747
Self(OpenPathInner::Ongoing {
48-
opened,
48+
opened: WatchStream::from_changes(opened),
4949
path_id,
5050
conn,
5151
})
@@ -68,15 +68,18 @@ impl Future for OpenPath {
6868
ref mut opened,
6969
path_id,
7070
ref mut conn,
71-
} => {
72-
let mut fut = std::pin::pin!(opened.wait_for(|v| v.is_some()));
73-
fut.as_mut().poll(ctx).map(|_| {
74-
Ok(Path {
75-
id: path_id,
76-
conn: conn.clone(),
77-
})
78-
})
79-
}
71+
} => match ready!(Pin::new(opened).poll_next(ctx)) {
72+
Some(value) => Poll::Ready(value.map(|_| Path {
73+
id: path_id,
74+
conn: conn.clone(),
75+
})),
76+
None => {
77+
// This only happens if receiving a notification change failed, this means the
78+
// sender was dropped. This generally should not happen so we use a transient
79+
// error
80+
Poll::Ready(Err(PathError::ValidationFailed))
81+
}
82+
},
8083
OpenPathInner::Ready {
8184
path_id,
8285
ref mut conn,

0 commit comments

Comments
 (0)