Skip to content

Commit 21c782a

Browse files
committed
add a way to wait for channel recovery
Signed-off-by: Marc-Antoine Perennou <[email protected]>
1 parent 20f1b59 commit 21c782a

File tree

8 files changed

+156
-89
lines changed

8 files changed

+156
-89
lines changed

src/channel.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ impl Channel {
566566

567567
fn next_expected_close_ok_reply(&self) -> Option<Reply> {
568568
self.frames
569-
.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed))
569+
.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed, None))
570570
}
571571

572572
fn before_channel_close(&self) {
@@ -582,7 +582,7 @@ impl Channel {
582582
self.set_closed(
583583
error
584584
.clone()
585-
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
585+
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing, None)),
586586
);
587587
if let Some(error) = error {
588588
self.error_handler.on_error(error);
@@ -936,7 +936,7 @@ impl Channel {
936936
}
937937

938938
fn on_channel_close_ok_received(&self) -> Result<()> {
939-
self.set_closed(Error::InvalidChannelState(ChannelState::Closed));
939+
self.set_closed(Error::InvalidChannelState(ChannelState::Closed, None));
940940
Ok(())
941941
}
942942

src/channel_recovery_context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
22
frames::{ExpectedReply, Frames},
3+
notifier::Notifier,
34
Error,
45
};
56

@@ -8,16 +9,22 @@ use std::collections::VecDeque;
89
pub(crate) struct ChannelRecoveryContext {
910
cause: Error,
1011
expected_replies: Option<VecDeque<ExpectedReply>>,
12+
notifier: Notifier,
1113
}
1214

1315
impl ChannelRecoveryContext {
1416
pub(crate) fn new(cause: Error) -> Self {
1517
Self {
1618
cause,
1719
expected_replies: None,
20+
notifier: Notifier::default(),
1821
}
1922
}
2023

24+
pub(crate) fn notifier(&self) -> Notifier {
25+
self.notifier.clone()
26+
}
27+
2128
pub(crate) fn set_expected_replies(
2229
&mut self,
2330
expected_replies: Option<VecDeque<ExpectedReply>>,
@@ -26,6 +33,7 @@ impl ChannelRecoveryContext {
2633
}
2734

2835
pub(crate) fn finalize_recovery(self) {
36+
self.notifier.notify_all();
2937
if let Some(replies) = self.expected_replies {
3038
Frames::cancel_expected_replies(replies, self.cause);
3139
}

src/channel_status.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
channel_receiver_state::{ChannelReceiverStates, DeliveryCause},
33
channel_recovery_context::ChannelRecoveryContext,
4+
notifier::Notifier,
45
types::{ChannelId, Identifier, PayloadSize},
56
Error, Result,
67
};
@@ -67,6 +68,11 @@ impl ChannelStatus {
6768
self.0.lock().state = state;
6869
}
6970

71+
pub fn state_error(&self) -> Error {
72+
let inner = self.0.lock();
73+
Error::InvalidChannelState(inner.state.clone(), inner.notifier())
74+
}
75+
7076
pub(crate) fn set_reconnecting(&self, error: Error) {
7177
let mut inner = self.0.lock();
7278
inner.state = ChannelState::Reconnecting;
@@ -193,4 +199,8 @@ impl Inner {
193199
ctx.finalize_recovery();
194200
}
195201
}
202+
203+
fn notifier(&self) -> Option<Notifier> {
204+
Some(self.recovery_context.as_ref()?.notifier())
205+
}
196206
}

src/error.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
channel_status::ChannelState, connection_status::ConnectionState, protocol::AMQPError,
3-
types::ChannelId,
2+
channel_status::ChannelState, connection_status::ConnectionState, notifier::Notifier,
3+
protocol::AMQPError, types::ChannelId,
44
};
55
use amq_protocol::{
66
frame::{GenError, ParserError, ProtocolVersion},
@@ -22,7 +22,7 @@ pub enum Error {
2222
InvalidProtocolVersion(ProtocolVersion),
2323

2424
InvalidChannel(ChannelId),
25-
InvalidChannelState(ChannelState),
25+
InvalidChannelState(ChannelState, Option<Notifier>),
2626
InvalidConnectionState(ConnectionState),
2727

2828
IOError(Arc<io::Error>),
@@ -84,7 +84,7 @@ impl fmt::Display for Error {
8484
}
8585

8686
Error::InvalidChannel(channel) => write!(f, "invalid channel: {}", channel),
87-
Error::InvalidChannelState(state) => write!(f, "invalid channel state: {:?}", state),
87+
Error::InvalidChannelState(state, _) => write!(f, "invalid channel state: {:?}", state),
8888
Error::InvalidConnectionState(state) => {
8989
write!(f, "invalid connection state: {:?}", state)
9090
}
@@ -144,7 +144,7 @@ impl PartialEq for Error {
144144
}
145145

146146
(InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
147-
(InvalidChannelState(left_inner), InvalidChannelState(right_inner)) => {
147+
(InvalidChannelState(left_inner, _), InvalidChannelState(right_inner, _)) => {
148148
left_inner == right_inner
149149
}
150150
(InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {

0 commit comments

Comments
 (0)