Skip to content

Commit 0263ae0

Browse files
committed
Code to clean up queue process state when crashed
* Call `Mod:format_state/1` if exported to possibly truncate huge states * Add more information about truncated ram_pending_ack and disk_pending_ack * Add `log.error_logger_format_depth` cuttlefish schema value * Add `format_state/1` to `rabbit_channel` * Add `log.summarize_process_state`, default is `false`, to enable summarizing process state for crash logs. * Added `format_state` to `rabbit_classic_queue_index_v2` and `rabbit_classic_queue_store_v2` * Ensure `rabbit_channel:format_state/1` uses `summarize_process_state_when_logged`
1 parent 8111713 commit 0263ae0

File tree

8 files changed

+125
-13
lines changed

8 files changed

+125
-13
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,25 @@ end}.
16741674
% Logging section
16751675
% ==========================
16761676

1677+
{mapping, "log.summarize_process_state", "rabbit.summarize_process_state_when_logged", [
1678+
{default, false},
1679+
{datatype, {enum, [true, false]}}
1680+
]}.
1681+
1682+
{mapping, "log.error_logger_format_depth", "kernel.error_logger_format_depth", [
1683+
{datatype, [{atom, unlimited}, integer]}
1684+
]}.
1685+
{translation, "kernel.error_logger_format_depth",
1686+
fun(Conf) ->
1687+
case cuttlefish:conf_get("log.error_logger_format_depth", Conf, undefined) of
1688+
undefined -> unlimited;
1689+
unlimited -> unlimited;
1690+
Val when is_integer(Val) andalso Val > 0 -> Val;
1691+
_ -> cuttlefish:invalid("should be positive integer or 'unlimited'")
1692+
end
1693+
end
1694+
}.
1695+
16771696
{mapping, "log.dir", "rabbit.log_root", [
16781697
{datatype, string},
16791698
{validators, ["dir_writable"]}]}.

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
-export([start_link/2]).
2525
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
2626
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
27-
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
27+
prioritise_cast/3, prioritise_info/3, format_state/1, format_message_queue/2]).
2828
-export([format/1]).
2929
-export([is_policy_applicable/2]).
3030

@@ -298,7 +298,7 @@ init_with_backing_queue_state(Q, BQ, BQS,
298298
notify_decorators(startup, State3),
299299
State3.
300300

301-
terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
301+
terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
302302
rabbit_core_metrics:queue_deleted(qname(State)),
303303
terminate_shutdown(
304304
fun (BQS) ->
@@ -1746,6 +1746,9 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
17461746
#q.stats_timer),
17471747
{hibernate, stop_rate_timer(State1)}.
17481748

1749+
format_state(#q{}=S) ->
1750+
maybe_format_backing_queue_state(S).
1751+
17491752
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
17501753

17511754
%% TODO: this can be removed after 3.13
@@ -1787,3 +1790,13 @@ queue_created_infos(State) ->
17871790
%% On the events API, we use long names for queue types
17881791
Keys = ?CREATION_EVENT_KEYS -- [type],
17891792
infos(Keys, State) ++ [{type, rabbit_classic_queue}].
1793+
1794+
maybe_format_backing_queue_state(S = #q{backing_queue = BQ,
1795+
backing_queue_state = BQS0}) ->
1796+
case erlang:function_exported(BQ, format_state, 1) of
1797+
true ->
1798+
BQS1 = BQ:format_state(BQS0),
1799+
S#q{backing_queue_state = BQS1};
1800+
_ ->
1801+
S#q{backing_queue_state = backing_queue_state_truncated}
1802+
end.

deps/rabbit/src/rabbit_channel.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
6161
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
6262
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
63-
format_message_queue/2]).
63+
format_state/1, format_message_queue/2]).
6464

6565
%% Internal
6666
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
@@ -818,6 +818,17 @@ terminate(_Reason,
818818
code_change(_OldVsn, State, _Extra) ->
819819
{ok, State}.
820820

821+
format_state(#ch{} = S) ->
822+
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).
823+
824+
format_state(false, #ch{} = S) ->
825+
S;
826+
format_state(true, #ch{unacked_message_q = UAMQ} = S) ->
827+
UAMQLen = ?QUEUE:len(UAMQ),
828+
Msg0 = io_lib:format("unacked_message_q (~b elements) (truncated)", [UAMQLen]),
829+
Msg1 = rabbit_data_coercion:to_utf8_binary(Msg0),
830+
S#ch{unacked_message_q = Msg1}.
831+
821832
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
822833

823834
get_consumer_timeout() ->

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
%% Shared with rabbit_classic_queue_store_v2.
3131
-export([queue_dir/2]).
3232

33+
%% Used to format the state and summarize large amounts of data in
34+
%% the state.
35+
-export([format_state/1]).
36+
3337
%% Internal. Used by tests.
3438
-export([segment_file/2]).
3539

@@ -1285,3 +1289,17 @@ write_file_and_ensure_dir(Name, IOData) ->
12851289
end;
12861290
Err -> Err
12871291
end.
1292+
1293+
format_state(#qi{write_buffer = WriteBuffer,
1294+
cache = Cache,
1295+
confirms = Confirms} = S) ->
1296+
WriteBufferSize = map_size(WriteBuffer),
1297+
CacheSize = map_size(Cache),
1298+
ConfirmsSize = sets:size(Confirms),
1299+
S#qi{write_buffer = format_state_element(write_buffer, WriteBufferSize),
1300+
cache = format_state_element(cache, CacheSize),
1301+
confirms = format_state_element(confirms, ConfirmsSize)}.
1302+
1303+
format_state_element(Element, Size) when is_atom(Element), is_integer(Size) ->
1304+
rabbit_data_coercion:to_utf8_binary(
1305+
io_lib:format("~tp (~b elements) (truncated)", [Element, Size])).

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
-export([init/1, terminate/1, info/1,
4848
write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3,
49-
remove/2, delete_segments/2]).
49+
remove/2, delete_segments/2, format_state/1]).
5050

5151
-define(SEGMENT_EXTENSION, ".qs").
5252

@@ -572,3 +572,14 @@ check_crc32() ->
572572
segment_file(Segment, #qs{dir = Dir}) ->
573573
N = integer_to_binary(Segment),
574574
<<Dir/binary, N/binary, ?SEGMENT_EXTENSION>>.
575+
576+
format_state(#qs{write_buffer = WriteBuffer,
577+
cache = Cache} = S) ->
578+
WriteBufferSize = map_size(WriteBuffer),
579+
CacheSize = map_size(Cache),
580+
S#qs{write_buffer = format_state_element(write_buffer, WriteBufferSize),
581+
cache = format_state_element(cache, CacheSize)}.
582+
583+
format_state_element(Element, Size) when is_atom(Element), is_integer(Size) ->
584+
rabbit_data_coercion:to_utf8_binary(
585+
io_lib:format("~tp (~b elements) (truncated)", [Element, Size])).

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
handle_pre_hibernate/1, resume/1, msg_rates/1,
3535
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
3636
set_queue_version/2,
37-
zip_msgs_and_acks/4]).
37+
zip_msgs_and_acks/4,
38+
format_state/1]).
3839

3940
-record(state, {bq, bqss, max_priority}).
4041
-record(passthrough, {bq, bqs}).
@@ -663,3 +664,12 @@ zip_msgs_and_acks(Pubs, AckTags) ->
663664
Id = mc:get_annotation(id, Msg),
664665
{Id, AckTag}
665666
end, Pubs, AckTags).
667+
668+
format_state(S = #passthrough{bq = BQ, bqs = BQS0}) ->
669+
case erlang:function_exported(BQ, format_state, 1) of
670+
true ->
671+
BQS1 = BQ:format_state(BQS0),
672+
S#passthrough{bqs = BQS1};
673+
_ ->
674+
S#passthrough{bqs = passthrough_bqs_truncated}
675+
end.

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
update_rates/1, needs_timeout/1, timeout/1,
1717
handle_pre_hibernate/1, resume/1, msg_rates/1,
1818
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
19-
set_queue_version/2, zip_msgs_and_acks/4]).
19+
set_queue_version/2, zip_msgs_and_acks/4,
20+
format_state/1]).
2021

2122
-export([start/2, stop/1]).
2223

@@ -2435,3 +2436,22 @@ maybe_client_terminate(MSCStateP) ->
24352436
_:_ ->
24362437
ok
24372438
end.
2439+
2440+
format_state(#vqstate{} = S) ->
2441+
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).
2442+
2443+
format_state(false, #vqstate{} = S) ->
2444+
S;
2445+
format_state(true, #vqstate{ram_pending_ack = RamPendingAck,
2446+
disk_pending_ack = DiskPendingAck,
2447+
index_state = IndexState,
2448+
store_state = StoreState} = S) ->
2449+
S#vqstate{ram_pending_ack = format_state_map_element(ram_pending_ack, RamPendingAck),
2450+
disk_pending_ack = format_state_map_element(disk_pending_ack, DiskPendingAck),
2451+
index_state = rabbit_classic_queue_index_v2:format_state(IndexState),
2452+
store_state = rabbit_classic_queue_store_v2:format_state(StoreState)}.
2453+
2454+
format_state_map_element(Element, M) when is_map(M) ->
2455+
Size = maps:size(M),
2456+
rabbit_data_coercion:to_utf8_binary(
2457+
io_lib:format("~tp (~b elements) (truncated)", [Element, Size])).

deps/rabbit_common/src/gen_server2.erl

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,14 +1150,15 @@ print_event(Dev, Event, Name) ->
11501150

11511151
terminate(Reason, Msg, #gs2_state { name = Name,
11521152
mod = Mod,
1153-
state = State,
1153+
state = ModState0,
11541154
debug = Debug,
11551155
stop_stats_fun = StopStatsFun
11561156
} = GS2State) ->
11571157
StopStatsFun(stop_stats_timer(GS2State)),
1158-
case catch Mod:terminate(Reason, State) of
1158+
case catch Mod:terminate(Reason, ModState0) of
11591159
{'EXIT', R} ->
1160-
error_info(R, Reason, Name, Msg, State, Debug),
1160+
ModState1 = maybe_format_state(Mod, ModState0),
1161+
error_info(R, Reason, Name, Msg, ModState1, Debug),
11611162
exit(R);
11621163
_ ->
11631164
case Reason of
@@ -1168,28 +1169,37 @@ terminate(Reason, Msg, #gs2_state { name = Name,
11681169
{shutdown,_}=Shutdown ->
11691170
exit(Shutdown);
11701171
_ ->
1171-
error_info(Reason, undefined, Name, Msg, State, Debug),
1172+
ModState1 = maybe_format_state(Mod, ModState0),
1173+
error_info(Reason, undefined, Name, Msg, ModState1, Debug),
11721174
exit(Reason)
11731175
end
11741176
end.
11751177

1178+
maybe_format_state(M, ModState) ->
1179+
case erlang:function_exported(M, format_state, 1) of
1180+
true ->
1181+
M:format_state(ModState);
1182+
false ->
1183+
ModState
1184+
end.
1185+
11761186
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
11771187
%% OTP-5811 Don't send an error report if it's the system process
11781188
%% application_controller which is terminating - let init take care
11791189
%% of it instead
11801190
ok;
1181-
error_info(Reason, RootCause, Name, Msg, State, Debug) ->
1191+
error_info(Reason, RootCause, Name, Msg, ModState, Debug) ->
11821192
Reason1 = error_reason(Reason),
11831193
Fmt =
11841194
"** Generic server ~tp terminating~n"
11851195
"** Last message in was ~tp~n"
11861196
"** When Server state == ~tp~n"
11871197
"** Reason for termination == ~n** ~tp~n",
11881198
case RootCause of
1189-
undefined -> format(Fmt, [Name, Msg, State, Reason1]);
1199+
undefined -> format(Fmt, [Name, Msg, ModState, Reason1]);
11901200
_ -> format(Fmt ++ "** In 'terminate' callback "
11911201
"with reason ==~n** ~tp~n",
1192-
[Name, Msg, State, Reason1,
1202+
[Name, Msg, ModState, Reason1,
11931203
error_reason(RootCause)])
11941204
end,
11951205
sys:print_log(Debug),

0 commit comments

Comments
 (0)