Skip to content

Commit 799e115

Browse files
committed
experiments
Signed-off-by: Marc-Antoine Perennou <[email protected]>
1 parent 74f18af commit 799e115

File tree

7 files changed

+236
-19
lines changed

7 files changed

+236
-19
lines changed

examples/t.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use lapin::{
2+
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
3+
BasicProperties, Connection, ConnectionProperties,
4+
};
5+
use tracing::info;
6+
7+
fn main() {
8+
if std::env::var("RUST_LOG").is_err() {
9+
std::env::set_var("RUST_LOG", "info");
10+
}
11+
12+
tracing_subscriber::fmt::init();
13+
14+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
15+
let recovery_config = lapin::experimental::RecoveryConfig {
16+
auto_recover_channels: true,
17+
};
18+
19+
async_global_executor::block_on(async {
20+
let conn = Connection::connect(
21+
&addr,
22+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
23+
)
24+
.await
25+
.expect("connection error");
26+
27+
info!("CONNECTED");
28+
29+
{
30+
let channel1 = conn.create_channel().await.expect("create_channel");
31+
let channel2 = conn.create_channel().await.expect("create_channel");
32+
channel1
33+
.confirm_select(ConfirmSelectOptions::default())
34+
.await
35+
.expect("confirm_select");
36+
channel1
37+
.queue_declare(
38+
"recover-test",
39+
QueueDeclareOptions::default(),
40+
FieldTable::default(),
41+
)
42+
.await
43+
.expect("queue_declare");
44+
45+
info!("will consume");
46+
let channel = channel2.clone();
47+
channel2
48+
.basic_consume(
49+
"recover-test",
50+
"my_consumer",
51+
BasicConsumeOptions::default(),
52+
FieldTable::default(),
53+
)
54+
.await
55+
.expect("basic_consume")
56+
.set_delegate(move |delivery: DeliveryResult| {
57+
let channel = channel.clone();
58+
async move {
59+
info!(message=?delivery, "received message");
60+
if let Ok(Some(delivery)) = delivery {
61+
delivery
62+
.ack(BasicAckOptions::default())
63+
.await
64+
.expect("basic_ack");
65+
if &delivery.data[..] == b"after" {
66+
channel
67+
.basic_cancel("my_consumer", BasicCancelOptions::default())
68+
.await
69+
.expect("basic_cancel");
70+
}
71+
}
72+
}
73+
});
74+
75+
info!("will publish");
76+
let confirm = channel1
77+
.basic_publish(
78+
"",
79+
"recover-test",
80+
BasicPublishOptions::default(),
81+
b"before",
82+
BasicProperties::default(),
83+
)
84+
.await
85+
.expect("basic_publish")
86+
.await
87+
.expect("publisher-confirms");
88+
assert_eq!(confirm, Confirmation::Ack(None));
89+
90+
info!("before fail");
91+
assert!(channel1
92+
.queue_declare(
93+
"fake queue",
94+
QueueDeclareOptions {
95+
passive: true,
96+
..QueueDeclareOptions::default()
97+
},
98+
FieldTable::default(),
99+
)
100+
.await
101+
.is_err());
102+
info!("after fail");
103+
104+
info!("publish after");
105+
let confirm = channel1
106+
.basic_publish(
107+
"",
108+
"recover-test",
109+
BasicPublishOptions::default(),
110+
b"after",
111+
BasicProperties::default(),
112+
)
113+
.await
114+
.expect("basic_publish")
115+
.await
116+
.expect("publisher-confirms");
117+
assert_eq!(confirm, Confirmation::Ack(None));
118+
}
119+
120+
conn.run().expect("conn.run");
121+
});
122+
}

src/acknowledgement.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl Acknowledgements {
6262
pub(crate) fn on_channel_error(&self, error: Error) {
6363
self.0.lock().on_channel_error(error);
6464
}
65+
66+
pub(crate) fn reset(&self) {
67+
self.0.lock().reset();
68+
}
6569
}
6670

6771
impl fmt::Debug for Acknowledgements {
@@ -174,4 +178,9 @@ impl Inner {
174178
}
175179
}
176180
}
181+
182+
fn reset(&mut self) {
183+
// FIXME(recovery): handle pendings ??
184+
self.delivery_tag = IdSequence::new(false);
185+
}
177186
}

src/channel.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ impl fmt::Debug for Channel {
8686
}
8787

8888
impl Channel {
89+
#[allow(clippy::too_many_arguments)]
8990
pub(crate) fn new(
9091
channel_id: ChannelId,
9192
configuration: Configuration,
@@ -579,13 +580,20 @@ impl Channel {
579580
}
580581

581582
fn on_channel_close_ok_sent(&self, error: Option<Error>) {
582-
self.set_closed(
583-
error
584-
.clone()
585-
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
586-
);
587-
if let Some(error) = error {
588-
self.error_handler.on_error(error);
583+
match (self.recovery_config.auto_recover_channels, error) {
584+
(true, Some(error)) if error.is_amqp_soft_error() => {
585+
self.status.set_reconnecting(error)
586+
}
587+
(_, error) => {
588+
self.set_closed(
589+
error
590+
.clone()
591+
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
592+
);
593+
if let Some(error) = error {
594+
self.error_handler.on_error(error);
595+
}
596+
}
589597
}
590598
}
591599

@@ -868,6 +876,15 @@ impl Channel {
868876
resolver: PromiseResolver<Channel>,
869877
channel: Channel,
870878
) -> Result<()> {
879+
if self.recovery_config.auto_recover_channels {
880+
self.status.update_recovery_context(|ctx| {
881+
ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
882+
});
883+
self.acknowledgements.reset();
884+
if !self.status.confirm() {
885+
self.status.finalize_recovery();
886+
}
887+
}
871888
self.set_state(ChannelState::Connected);
872889
resolver.resolve(channel);
873890
Ok(())
@@ -907,8 +924,17 @@ impl Channel {
907924
self.set_closing(error.clone().ok());
908925
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
909926
let channel = self.clone();
910-
self.internal_rpc
911-
.register_internal_future(async move { channel.channel_close_ok(error).await });
927+
self.internal_rpc.register_internal_future(async move {
928+
channel.channel_close_ok(error).await?;
929+
if channel.recovery_config.auto_recover_channels {
930+
let ch = channel.clone();
931+
channel.channel_open(ch).await?;
932+
if channel.status.confirm() {
933+
channel.confirm_select(ConfirmSelectOptions::default()).await?;
934+
}
935+
}
936+
Ok(())
937+
});
912938
Ok(())
913939
}
914940

src/channel_recovery_context.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use crate::{frames::{ExpectedReply, Frames}, Error};
2+
3+
use std::collections::VecDeque;
4+
5+
pub(crate) struct ChannelRecoveryContext {
6+
pub(crate) cause: Error,
7+
expected_replies: Option<VecDeque<ExpectedReply>>,
8+
}
9+
10+
impl ChannelRecoveryContext {
11+
pub(crate) fn new(cause: Error) -> Self {
12+
Self { cause, expected_replies: None, }
13+
}
14+
15+
pub(crate) fn set_expected_replies(&mut self, expected_replies: Option<VecDeque<ExpectedReply>>) {
16+
self.expected_replies = expected_replies;
17+
}
18+
19+
pub(crate) fn finalize_recovery(self) {
20+
if let Some(replies) = self.expected_replies {
21+
Frames::cancel_expected_replies(replies, self.cause);
22+
}
23+
}
24+
}

src/channel_status.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::{
22
channel_receiver_state::{ChannelReceiverStates, DeliveryCause},
3+
channel_recovery_context::ChannelRecoveryContext,
34
types::{ChannelId, Identifier, PayloadSize},
4-
Result,
5+
Error, Result,
56
};
67
use parking_lot::Mutex;
78
use std::{fmt, sync::Arc};
@@ -12,7 +13,7 @@ pub struct ChannelStatus(Arc<Mutex<Inner>>);
1213

1314
impl ChannelStatus {
1415
pub fn initializing(&self) -> bool {
15-
self.0.lock().state == ChannelState::Initial
16+
[ChannelState::Initial, ChannelState::Reconnecting].contains(&self.0.lock().state)
1617
}
1718

1819
pub fn closing(&self) -> bool {
@@ -23,6 +24,17 @@ impl ChannelStatus {
2324
self.0.lock().state == ChannelState::Connected
2425
}
2526

27+
pub(crate) fn update_recovery_context<F: Fn(&mut ChannelRecoveryContext)>(&self, apply: F) {
28+
let mut inner = self.0.lock();
29+
if let Some(context) = inner.recovery_context.as_mut() {
30+
apply(context);
31+
}
32+
}
33+
34+
pub(crate) fn finalize_recovery(&self) {
35+
self.0.lock().finalize_recovery();
36+
}
37+
2638
pub(crate) fn can_receive_messages(&self) -> bool {
2739
[ChannelState::Closing, ChannelState::Connected].contains(&self.0.lock().state)
2840
}
@@ -32,8 +44,10 @@ impl ChannelStatus {
3244
}
3345

3446
pub(crate) fn set_confirm(&self) {
35-
self.0.lock().confirm = true;
47+
let mut inner = self.0.lock();
48+
inner.confirm = true;
3649
trace!("Publisher confirms activated");
50+
inner.finalize_recovery();
3751
}
3852

3953
pub fn state(&self) -> ChannelState {
@@ -44,6 +58,12 @@ impl ChannelStatus {
4458
self.0.lock().state = state;
4559
}
4660

61+
pub(crate) fn set_reconnecting(&self, error: Error) {
62+
let mut inner = self.0.lock();
63+
inner.state = ChannelState::Reconnecting;
64+
inner.recovery_context = Some(ChannelRecoveryContext::new(error));
65+
}
66+
4767
pub(crate) fn auto_close(&self, id: ChannelId) -> bool {
4868
id != 0 && self.0.lock().state == ChannelState::Connected
4969
}
@@ -116,6 +136,7 @@ impl ChannelStatus {
116136
pub enum ChannelState {
117137
#[default]
118138
Initial,
139+
Reconnecting,
119140
Connected,
120141
Closing,
121142
Closed,
@@ -141,6 +162,7 @@ struct Inner {
141162
send_flow: bool,
142163
state: ChannelState,
143164
receiver_state: ChannelReceiverStates,
165+
recovery_context: Option<ChannelRecoveryContext>,
144166
}
145167

146168
impl Default for Inner {
@@ -150,6 +172,15 @@ impl Default for Inner {
150172
send_flow: true,
151173
state: ChannelState::default(),
152174
receiver_state: ChannelReceiverStates::default(),
175+
recovery_context: None,
176+
}
177+
}
178+
}
179+
180+
impl Inner {
181+
pub(crate) fn finalize_recovery(&mut self) {
182+
if let Some(ctx) = self.recovery_context.take() {
183+
ctx.finalize_recovery();
153184
}
154185
}
155186
}

src/frames.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,18 @@ impl Frames {
8787
self.inner.lock().drop_pending(error);
8888
}
8989

90+
pub(crate) fn take_expected_replies(&self, channel_id: ChannelId) -> Option<VecDeque<ExpectedReply>> {
91+
self.inner.lock().expected_replies.remove(&channel_id)
92+
}
93+
9094
pub(crate) fn clear_expected_replies(&self, channel_id: ChannelId, error: Error) {
91-
self.inner.lock().clear_expected_replies(channel_id, error);
95+
if let Some(replies) = self.take_expected_replies(channel_id) {
96+
Self::cancel_expected_replies(replies, error)
97+
}
98+
}
99+
100+
pub(crate) fn cancel_expected_replies(replies: VecDeque<ExpectedReply>, error: Error) {
101+
Inner::cancel_expected_replies(replies, error)
92102
}
93103

94104
pub(crate) fn poison(&self) -> Option<Error> {
@@ -265,12 +275,6 @@ impl Inner {
265275
None
266276
}
267277

268-
fn clear_expected_replies(&mut self, channel_id: ChannelId, error: Error) {
269-
if let Some(replies) = self.expected_replies.remove(&channel_id) {
270-
Self::cancel_expected_replies(replies, error);
271-
}
272-
}
273-
274278
fn cancel_expected_replies(replies: VecDeque<ExpectedReply>, error: Error) {
275279
for ExpectedReply(reply, cancel) in replies {
276280
match reply {

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ mod buffer;
136136
mod channel;
137137
mod channel_closer;
138138
mod channel_receiver_state;
139+
mod channel_recovery_context;
139140
mod channel_status;
140141
mod channels;
141142
mod configuration;

0 commit comments

Comments
 (0)