From 1022b9c45745b0c5daac75fe850e636d4d924b0c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 8 Aug 2025 09:15:01 -0700 Subject: [PATCH] Code to clean up queue process state when crashed * Call `Mod:format_state/1` if exported to possibly truncate huge states * Add more information about truncated ram_pending_ack and disk_pending_ack * Add `log.error_logger_format_depth` cuttlefish schema value * Add `format_state/1` to `rabbit_channel` * Add `log.summarize_process_state`, default is `false`, to enable summarizing process state for crash logs. * Added `format_state` to `rabbit_classic_queue_index_v2` and `rabbit_classic_queue_store_v2` * Ensure `rabbit_channel:format_state/1` uses `summarize_process_state_when_logged` --- deps/rabbit/priv/schema/rabbit.schema | 19 +++++++++++++++ deps/rabbit/src/rabbit_amqqueue_process.erl | 17 +++++++++++-- deps/rabbit/src/rabbit_channel.erl | 13 +++++++++- .../src/rabbit_classic_queue_index_v2.erl | 17 +++++++++++++ .../src/rabbit_classic_queue_store_v2.erl | 12 +++++++++- deps/rabbit/src/rabbit_priority_queue.erl | 12 +++++++++- deps/rabbit/src/rabbit_variable_queue.erl | 22 ++++++++++++++++- deps/rabbit_common/src/gen_server2.erl | 24 +++++++++++++------ 8 files changed, 123 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 5f181d3f83f0..816983bb77f3 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1674,6 +1674,25 @@ end}. % Logging section % ========================== +{mapping, "log.summarize_process_state", "rabbit.summarize_process_state_when_logged", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "log.error_logger_format_depth", "kernel.error_logger_format_depth", [ + {datatype, [{atom, unlimited}, integer]} +]}. +{translation, "kernel.error_logger_format_depth", + fun(Conf) -> + case cuttlefish:conf_get("log.error_logger_format_depth", Conf, undefined) of + undefined -> unlimited; + unlimited -> unlimited; + Val when is_integer(Val) andalso Val > 0 -> Val; + _ -> cuttlefish:invalid("should be positive integer or 'unlimited'") + end + end +}. + {mapping, "log.dir", "rabbit.log_root", [ {datatype, string}, {validators, ["dir_writable"]}]}. diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 5dda2cb80145..0528ab8389b0 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -24,7 +24,7 @@ -export([start_link/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, - prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + prioritise_cast/3, prioritise_info/3, format_state/1, format_message_queue/2]). -export([format/1]). -export([is_policy_applicable/2]). @@ -298,7 +298,7 @@ init_with_backing_queue_state(Q, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown( fun (BQS) -> @@ -1746,6 +1746,9 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. +format_state(#q{}=S) -> + maybe_format_backing_queue_state(S). + format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). %% TODO: this can be removed after 3.13 @@ -1787,3 +1790,13 @@ queue_created_infos(State) -> %% On the events API, we use long names for queue types Keys = ?CREATION_EVENT_KEYS -- [type], infos(Keys, State) ++ [{type, rabbit_classic_queue}]. + +maybe_format_backing_queue_state(S = #q{backing_queue = BQ, + backing_queue_state = BQS0}) -> + case erlang:function_exported(BQ, format_state, 1) of + true -> + BQS1 = BQ:format_state(BQS0), + S#q{backing_queue_state = BQS1}; + _ -> + S#q{backing_queue_state = backing_queue_state_truncated} + end. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 04ce9c93ae3b..6e63cda0b74b 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -60,7 +60,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, - format_message_queue/2]). + format_state/1, format_message_queue/2]). %% Internal -export([list_local/0, emit_info_local/3, deliver_reply_local/3]). @@ -806,6 +806,17 @@ terminate(_Reason, code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_state(#ch{} = S) -> + format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S). + +format_state(false, #ch{} = S) -> + S; +format_state(true, #ch{unacked_message_q = UAMQ} = S) -> + UAMQLen = ?QUEUE:len(UAMQ), + Msg0 = io_lib:format("unacked_message_q (~b elements) (truncated)", [UAMQLen]), + Msg1 = rabbit_data_coercion:to_utf8_binary(Msg0), + S#ch{unacked_message_q = Msg1}. + format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). get_consumer_timeout() -> diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 7d6fa3de8aea..6d59a1f512e0 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -30,6 +30,10 @@ %% Shared with rabbit_classic_queue_store_v2. -export([queue_dir/2]). +%% Used to format the state and summarize large amounts of data in +%% the state. +-export([format_state/1]). + %% Internal. Used by tests. -export([segment_file/2]). @@ -1285,3 +1289,16 @@ write_file_and_ensure_dir(Name, IOData) -> end; Err -> Err end. + +format_state(#qi{write_buffer = WriteBuffer, + cache = Cache, + confirms = Confirms} = S) -> + WriteBufferSize = map_size(WriteBuffer), + ConfirmsSize = sets:size(Confirms), + S#qi{write_buffer = format_state_element(write_buffer, WriteBufferSize), + cache = maps:keys(Cache), + confirms = format_state_element(confirms, ConfirmsSize)}. + +format_state_element(Element, Size) when is_atom(Element), is_integer(Size) -> + rabbit_data_coercion:to_utf8_binary( + io_lib:format("~tp (~b elements) (truncated)", [Element, Size])). diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 9e27c7c25193..426277e5293e 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -46,7 +46,7 @@ -export([init/1, terminate/1, info/1, write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3, - remove/2, delete_segments/2]). + remove/2, delete_segments/2, format_state/1]). -define(SEGMENT_EXTENSION, ".qs"). @@ -572,3 +572,13 @@ check_crc32() -> segment_file(Segment, #qs{dir = Dir}) -> N = integer_to_binary(Segment), <>. + +format_state(#qs{write_buffer = WriteBuffer, + cache = Cache} = S) -> + WriteBufferSize = map_size(WriteBuffer), + S#qs{write_buffer = format_state_element(write_buffer, WriteBufferSize), + cache = maps:keys(Cache)}. + +format_state_element(Element, Size) when is_atom(Element), is_integer(Size) -> + rabbit_data_coercion:to_utf8_binary( + io_lib:format("~tp (~b elements) (truncated)", [Element, Size])). diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index 33657daa631a..d11ffa910fc6 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -34,7 +34,8 @@ handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, set_queue_version/2, - zip_msgs_and_acks/4]). + zip_msgs_and_acks/4, + format_state/1]). -record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). @@ -663,3 +664,12 @@ zip_msgs_and_acks(Pubs, AckTags) -> Id = mc:get_annotation(id, Msg), {Id, AckTag} end, Pubs, AckTags). + +format_state(S = #passthrough{bq = BQ, bqs = BQS0}) -> + case erlang:function_exported(BQ, format_state, 1) of + true -> + BQS1 = BQ:format_state(BQS0), + S#passthrough{bqs = BQS1}; + _ -> + S#passthrough{bqs = passthrough_bqs_truncated} + end. diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index ed951bca34e9..0c2ec75767a4 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -16,7 +16,8 @@ update_rates/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - set_queue_version/2, zip_msgs_and_acks/4]). + set_queue_version/2, zip_msgs_and_acks/4, + format_state/1]). -export([start/2, stop/1]). @@ -2435,3 +2436,22 @@ maybe_client_terminate(MSCStateP) -> _:_ -> ok end. + +format_state(#vqstate{} = S) -> + format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S). + +format_state(false, #vqstate{} = S) -> + S; +format_state(true, #vqstate{q3 = Q3, + ram_pending_ack = RamPendingAck, + disk_pending_ack = DiskPendingAck, + index_state = IndexState, + store_state = StoreState} = S) -> + S#vqstate{q3 = format_q3(Q3), + ram_pending_ack = maps:keys(RamPendingAck), + disk_pending_ack = maps:keys(DiskPendingAck), + index_state = rabbit_classic_queue_index_v2:format_state(IndexState), + store_state = rabbit_classic_queue_store_v2:format_state(StoreState)}. + +format_q3(Q3) -> + [SeqId || #msg_status{seq_id = SeqId} <- ?QUEUE:to_list(Q3)]. diff --git a/deps/rabbit_common/src/gen_server2.erl b/deps/rabbit_common/src/gen_server2.erl index 80dd79bb4502..01975079bccc 100644 --- a/deps/rabbit_common/src/gen_server2.erl +++ b/deps/rabbit_common/src/gen_server2.erl @@ -1150,14 +1150,15 @@ print_event(Dev, Event, Name) -> terminate(Reason, Msg, #gs2_state { name = Name, mod = Mod, - state = State, + state = ModState0, debug = Debug, stop_stats_fun = StopStatsFun } = GS2State) -> StopStatsFun(stop_stats_timer(GS2State)), - case catch Mod:terminate(Reason, State) of + case catch Mod:terminate(Reason, ModState0) of {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), + ModState1 = maybe_format_state(Mod, ModState0), + error_info(R, Reason, Name, Msg, ModState1, Debug), exit(R); _ -> case Reason of @@ -1168,17 +1169,26 @@ terminate(Reason, Msg, #gs2_state { name = Name, {shutdown,_}=Shutdown -> exit(Shutdown); _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), + ModState1 = maybe_format_state(Mod, ModState0), + error_info(Reason, undefined, Name, Msg, ModState1, Debug), exit(Reason) end end. +maybe_format_state(M, ModState) -> + case erlang:function_exported(M, format_state, 1) of + true -> + M:format_state(ModState); + false -> + ModState + end. + error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> %% OTP-5811 Don't send an error report if it's the system process %% application_controller which is terminating - let init take care %% of it instead ok; -error_info(Reason, RootCause, Name, Msg, State, Debug) -> +error_info(Reason, RootCause, Name, Msg, ModState, Debug) -> Reason1 = error_reason(Reason), Fmt = "** Generic server ~tp terminating~n" @@ -1186,10 +1196,10 @@ error_info(Reason, RootCause, Name, Msg, State, Debug) -> "** When Server state == ~tp~n" "** Reason for termination == ~n** ~tp~n", case RootCause of - undefined -> format(Fmt, [Name, Msg, State, Reason1]); + undefined -> format(Fmt, [Name, Msg, ModState, Reason1]); _ -> format(Fmt ++ "** In 'terminate' callback " "with reason ==~n** ~tp~n", - [Name, Msg, State, Reason1, + [Name, Msg, ModState, Reason1, error_reason(RootCause)]) end, sys:print_log(Debug),