Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 187 additions & 18 deletions deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

-define(TIMEOUT, 30_000).

%% This is the pseudo queue that is specially interpreted by RabbitMQ.
-define(REPLY_QUEUE, <<"amq.rabbitmq.reply-to">>).

all() ->
[
{group, cluster_size_1},
Expand All @@ -28,7 +31,11 @@ groups() ->
[
{cluster_size_1, [shuffle],
[
trace
trace,
failure_ack_mode,
failure_multiple_consumers,
failure_reuse_consumer_tag,
failure_publish
]},
{cluster_size_3, [shuffle],
[
Expand Down Expand Up @@ -82,8 +89,6 @@ trace(Config) ->
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>,
RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>,
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
RequestPayload = <<"my request">>,
ReplyPayload = <<"my reply">>,
CorrelationId = <<"my correlation ID">>,
Expand All @@ -102,7 +107,7 @@ trace(Config) ->

%% There is no need to declare this pseudo queue first.
amqp_channel:subscribe(RequesterCh,
#'basic.consume'{queue = ReplyQueue,
#'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = true},
self()),
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
Expand All @@ -114,7 +119,7 @@ trace(Config) ->
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
Expand Down Expand Up @@ -182,6 +187,85 @@ trace(Config) ->
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q0}) || Q0 <- Qs],
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]).

%% A consumer must consume in no-ack mode.
failure_ack_mode(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
Consume = #'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = false},
try amqp_channel:subscribe(Ch, Consume, self()) of
_ ->
ct:fail("expected subscribe in ack mode to fail")
catch exit:Reason ->
?assertMatch(
{{_, {_, _, <<"PRECONDITION_FAILED - reply consumer cannot acknowledge">>}}, _},
Reason)
end,
ok = rabbit_ct_client_helpers:close_connection(Conn).

%% In AMQP 0.9.1 there can be at most one reply consumer per channel.
failure_multiple_consumers(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
Consume = #'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = true},
amqp_channel:subscribe(Ch, Consume, self()),
receive #'basic.consume_ok'{} -> ok
end,

try amqp_channel:subscribe(Ch, Consume, self()) of
_ ->
ct:fail("expected second subscribe to fail")
catch exit:Reason ->
?assertMatch(
{{_, {_, _, <<"PRECONDITION_FAILED - reply consumer already set">>}}, _},
Reason)
end,
ok = rabbit_ct_client_helpers:close_connection(Conn).

%% Reusing the same consumer tag should fail.
failure_reuse_consumer_tag(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
Ctag = <<"my-tag">>,

#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
consumer_tag = Ctag}, self()),
receive #'basic.consume_ok'{} -> ok
end,

try amqp_channel:subscribe(Ch, #'basic.consume'{queue = ?REPLY_QUEUE,
consumer_tag = Ctag,
no_ack = true}, self()) of
_ ->
ct:fail("expected reusing consumer tag to fail")
catch exit:Reason ->
?assertMatch(
{{_, {connection_closing,
{_, _, <<"NOT_ALLOWED - attempt to reuse consumer tag 'my-tag'">>}
}}, _},
Reason)
end.

%% Publishing with reply_to header set but without consuming from the pseudo queue should fail.
failure_publish(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),

Ref = monitor(process, Ch),
amqp_channel:cast(
Ch,
#'basic.publish'{routing_key = <<"some request queue">>},
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
correlation_id = <<"some correlation ID">>},
payload = <<"some payload">>}),

receive {'DOWN', Ref, process, Ch, Reason} ->
?assertMatch(
{_, {_, _, <<"PRECONDITION_FAILED - fast reply consumer does not exist">>}},
Reason)
after ?TIMEOUT ->
ct:fail("expected channel error")
end,
ok = rabbit_ct_client_helpers:close_connection(Conn).

%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
rpc_new_to_old_node(Config) ->
rpc(0, 1, Config).
Expand All @@ -190,36 +274,40 @@ rpc_old_to_new_node(Config) ->
rpc(1, 0, Config).

rpc(RequesterNode, ResponderNode, Config) ->
RequestQueue = <<"tests.amqpl_direct_reply_to.rpc.requests">>,
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
RequestQueue = <<"request queue">>,
RequestPayload = <<"my request">>,
ReplyPayload = <<"my reply">>,
CorrelationId = <<"my correlation ID">>,
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode),
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode),

%% There is no need to declare this pseudo queue first.
amqp_channel:subscribe(RequesterCh,
#'basic.consume'{queue = ReplyQueue,
#'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = true},
self()),
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
end,

?assertEqual(#'queue.declare_ok'{queue = ?REPLY_QUEUE,
message_count = 0,
consumer_count = 1},
amqp_channel:call(RequesterCh,
#'queue.declare'{queue = ?REPLY_QUEUE})),

#'queue.declare_ok'{} = amqp_channel:call(
RequesterCh,
#'queue.declare'{queue = RequestQueue}),
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
amqp_channel:register_confirm_handler(RequesterCh, self()),

%% Send the request.
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
after ?TIMEOUT -> ct:fail(confirm_timeout)
end,

ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config),
Expand All @@ -229,20 +317,101 @@ rpc(RequesterNode, ResponderNode, Config) ->
correlation_id = CorrelationId},
payload = RequestPayload}
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),

%% Test what the docs state:
%% "If the RPC server is going to perform some expensive computation it might wish
%% to check if the client has gone away. To do this the server can declare the
%% generated reply name first on a disposable channel in order to determine whether
%% it still exists."
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
message_count = 0,
consumer_count = 1},
amqp_channel:call(ResponderCh,
#'queue.declare'{queue = ReplyTo})),

%% Send the reply.
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = ReplyPayload}),
payload = <<"reply 1">>}),

%% Receive the reply.
%% Let's assume the RPC server sends multiple replies for a single request.
%% (This is a bit unusual but should work.)
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = <<"reply 2">>}),

%% Receive the frst reply.
receive {#'basic.deliver'{consumer_tag = CTag,
redelivered = false,
exchange = <<>>,
routing_key = ReplyTo},
#amqp_msg{payload = P1,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
?assertEqual(<<"reply 1">>, P1)
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
end,

%% Receive the second reply.
receive {#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = ReplyPayload,
#amqp_msg{payload = P2,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
ok
after ?TIMEOUT -> ct:fail(missing_reply)
end.
?assertEqual(<<"reply 2">>, P2)
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
end,

%% The requester sends a reply to itself.
%% (Really odd, but should work.)
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = <<"reply 3">>}),

receive {#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = P3,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
?assertEqual(<<"reply 3">>, P3)
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
end,

%% Requester cancels consumption.
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})),

%% Send a final reply.
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = <<"reply 4">>}),

%% The final reply shouldn't be delivered since the requester cancelled consumption.
receive {#'basic.deliver'{}, #amqp_msg{}} ->
ct:fail("did not expect delivery after cancellation")
after 100 -> ok
end,

%% Responder checks again if the requester is still there.
%% This time, the requester and its queue should be gone.
try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of
_ ->
ct:fail("expected queue.declare to fail")
catch exit:Reason ->
?assertMatch(
{{_, {_, _, <<"NOT_FOUND - no queue '",
ReplyTo:(byte_size(ReplyTo))/binary,
"' in vhost '/'">>}}, _},
Reason)
end,

%% Clean up.
#'queue.delete_ok'{} = amqp_channel:call(RequesterCh,
#'queue.delete'{queue = RequestQueue}),
ok = rabbit_ct_client_helpers:close_channel(RequesterCh).

wait_for_queue_declared(Queue, Node, Config) ->
eventually(
Expand Down
Loading