Skip to content

Expose ra counters, remove ra_metrics #545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli

dep_gen_batch_server = hex 0.8.9
dep_aten = hex 0.6.0
dep_seshat = hex 0.6.0
dep_seshat = git https://github.com/rabbitmq/seshat.git prometheus-support
DEPS = aten gen_batch_server seshat

TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ which algorithms such as Raft significantly benefit from.

This library was primarily developed as the foundation of a replication layer for
[quorum queues](https://rabbitmq.com/quorum-queues.html) in RabbitMQ, and today
also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).
also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).

The design it aims to replace uses
a variant of [Chain Based Replication](https://www.cs.cornell.edu/home/rvr/papers/OSDI04.pdf)
Expand Down Expand Up @@ -405,9 +405,14 @@ is available in a separate repository.
</tr>
<tr>
<td>metrics_key</td>
<td>Metrics key. The key used to write metrics into the ra_metrics table.</td>
<td>DEPRECATED: Metrics key. The key was used to write metrics into the ra_metrics table. Metrics are now in Seshat.</td>
<td>Atom</td>
</tr>
<tr>
<td>metrics_labels</td>
<td>Metrics labels. Labels that are later returned with metrics for this server (eg. `#{vhost => ..., queue => ...}` for quorum queues).</td>
<td>Map</td>
</tr>
<tr>
<td>low_priority_commands_flush_size</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{deps, [
{gen_batch_server, "0.8.9"},
{aten, "0.6.0"},
{seshat, "0.6.0"}
{seshat, {git, "https://github.com/rabbitmq/seshat.git", {branch, "prometheus-support"}}}
]}.

{profiles,
Expand Down
9 changes: 3 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
{"1.2.0",
[{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},0},
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.9">>},0},
{<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},0}]}.
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.9">>},0}]}.
[
{pkg_hash,[
{<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>},
{<<"gen_batch_server">>, <<"1C6BC0F530BF8C17E8B4ACC20C2CC369FFA5BEE2B46DE01E21410745F24B1BC9">>},
{<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]},
{<<"gen_batch_server">>, <<"1C6BC0F530BF8C17E8B4ACC20C2CC369FFA5BEE2B46DE01E21410745F24B1BC9">>}]},
{pkg_hash_ext,[
{<<"aten">>, <<"5F39A164206AE3F211EF5880B1F7819415686436E3229D30B6A058564FBAA168">>},
{<<"gen_batch_server">>, <<"C8581FE4A4B6BCCF91E53CE6A8C7E6C27C8C591BAB5408B160166463F5579C22">>},
{<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]}
{<<"gen_batch_server">>, <<"C8581FE4A4B6BCCF91E53CE6A8C7E6C27C8C591BAB5408B160166463F5579C22">>}]}
].
2 changes: 1 addition & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@
"The last index of the log."},
{last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter,
"The last fully written and fsynced index of the log."},
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, {gauge, time_ms},
"Approximate time taken from an entry being written to the log until it is committed."},
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."},
{checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter,
Expand Down
4 changes: 1 addition & 3 deletions src/ra_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ spawn_client(Parent, Leader, Num, DataSize, Pipe, Counter) ->
end
end).

print_metrics(Name) ->
print_metrics(_Name) ->
io:format("Node: ~w~n", [node()]),
io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]),
io:format("counters ~p~n", [ra_counters:overview()]).


Expand All @@ -220,4 +219,3 @@ print_metrics(Name) ->
% GzFile = Base ++ ".gz.*",
% lg_callgrind:profile_many(GzFile, Base ++ ".out",#{}),
% ok.

10 changes: 7 additions & 3 deletions src/ra_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-export([
init/0,
new/2,
new/3,
fetch/1,
overview/0,
overview/1,
Expand All @@ -32,6 +33,9 @@ init() ->
new(Name, FieldsSpec) ->
seshat:new(ra, Name, FieldsSpec).

new(Name, FieldsSpec, MetricLabels) ->
seshat:new(ra, Name, FieldsSpec, MetricLabels).

-spec fetch(name()) -> undefined | counters:counters_ref().
fetch(Name) ->
seshat:fetch(ra, Name).
Expand All @@ -42,11 +46,11 @@ delete(Name) ->

-spec overview() -> #{name() => #{atom() => non_neg_integer()}}.
overview() ->
seshat:overview(ra).
seshat:counters(ra).

-spec overview(name()) -> #{atom() => non_neg_integer()}.
-spec overview(name()) -> #{atom() => non_neg_integer()} | undefined.
overview(Name) ->
seshat:overview(ra, Name).
seshat:counters(ra, Name).

-spec counters(name(), [atom()]) ->
#{atom() => non_neg_integer()} | undefined.
Expand Down
3 changes: 1 addition & 2 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ init([#{data_dir := DataDir,
name := SegWriterName,
system := System} = Conf]) ->
process_flag(trap_exit, true),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
Expand Down Expand Up @@ -554,4 +554,3 @@ maybe_upgrade_segment_file_names(System, DataDir) ->
true ->
ok
end.

3 changes: 2 additions & 1 deletion src/ra_log_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ init([#{data_dir := DataDir,


make_wal_conf(#{data_dir := DataDir,
name := _System,
name := System,
names := #{wal := WalName,
segment_writer := SegWriterName} = Names} = Cfg) ->
WalDir = case Cfg of
Expand All @@ -78,6 +78,7 @@ make_wal_conf(#{data_dir := DataDir,
?MIN_BIN_VHEAP_SIZE),
MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE),
#{name => WalName,
system => System,
names => Names,
dir => WalDir,
segment_writer => SegWriterName,
Expand Down
5 changes: 3 additions & 2 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@

-type state() :: #state{}.
-type wal_conf() :: #{name := atom(), %% the name to register the wal as
system := atom(),
names := ra_system:names(),
dir := file:filename_all(),
max_size_bytes => non_neg_integer(),
Expand Down Expand Up @@ -254,7 +255,7 @@ start_link(#{name := Name} = Config)
-spec init(wal_conf()) ->
{ok, state()} |
{stop, wal_checksum_validation_failure} | {stop, term()}.
init(#{dir := Dir} = Conf0) ->
init(#{dir := Dir, system := System} = Conf0) ->
#{max_size_bytes := MaxWalSize,
max_entries := MaxEntries,
recovery_chunk_size := RecoveryChunkSize,
Expand All @@ -278,7 +279,7 @@ init(#{dir := Dir} = Conf0) ->
process_flag(message_queue_data, off_heap),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
CRef = ra_counters:new(WalName, ?COUNTER_FIELDS),
CRef = ra_counters:new(WalName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
Conf = #conf{dir = Dir,
segment_writer = SegWriter,
compute_checksums = ComputeChecksums,
Expand Down
26 changes: 3 additions & 23 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
tick/1,
log_tick/1,
overview/1,
metrics/1,
is_new/1,
is_fully_persisted/1,
is_fully_replicated/1,
Expand Down Expand Up @@ -219,6 +218,7 @@
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
metrics_labels => map(),
% TODO: review - only really used for
% setting election timeouts
broadcast_time => non_neg_integer(), % ms
Expand Down Expand Up @@ -258,6 +258,7 @@

-type mutable_config() :: #{cluster_name => ra_cluster_name(),
metrics_key => term(),
metrics_labels => map(),
broadcast_time => non_neg_integer(), % ms
tick_timeout => non_neg_integer(), % ms
install_snap_rpc_timeout => non_neg_integer(), % ms
Expand Down Expand Up @@ -380,6 +381,7 @@ init(#{id := Id,
uid = UId,
log_id = LogId,
metrics_key = MetricKey,
metrics_labels = #{},
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
Expand Down Expand Up @@ -1793,28 +1795,6 @@ cfg_to_map(Cfg) ->
{N + 1, Acc#{F => element(N, Cfg)}}
end, {2, #{}}, record_info(fields, cfg))).

-spec metrics(ra_server_state()) ->
{atom(), ra_term(),
ra_index(), ra_index(),
ra_index(), ra_index(), non_neg_integer()}.
metrics(#{cfg := #cfg{metrics_key = Key},
commit_index := CI,
last_applied := LA,
current_term := CT,
log := Log} = State) ->
SnapIdx = case ra_log:snapshot_index_term(Log) of
undefined -> 0;
{I, _} -> I
end,
CL = case State of
#{commit_latency := L} ->
L;
_ ->
0
end,
{LW, _} = ra_log:last_index_term(Log),
{Key, CT, SnapIdx, LA, CI, LW, CL}.

-spec is_new(ra_server_state()) -> boolean().
is_new(#{log := Log}) ->
ra_log:next_index(Log) =:= 1.
Expand Down
1 change: 1 addition & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
uid :: ra_uid(),
log_id :: unicode:chardata(),
metrics_key :: term(),
metrics_labels :: map(),
machine :: ra_machine:machine(),
machine_version :: ra_machine:version(),
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
Expand Down
28 changes: 10 additions & 18 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ do_init(#{id := Id,
Key = ra_lib:ra_server_id_to_local_name(Id),
true = ets:insert(ra_state, {Key, init, unknown}),
process_flag(trap_exit, true),
MetricLabels = maps:get(metrics_labels, Config0, #{}),
Config = #{counter := Counter,
system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id),
system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id, MetricLabels),
Config0),
MsgQData = maps:get(message_queue_data, SysConf, off_heap),
MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf,
Expand Down Expand Up @@ -413,9 +414,8 @@ recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_cluster_change(State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
recovered(internal, next, State) ->
true = erlang:garbage_collect(),
_ = ets:insert(ra_metrics, ra_server:metrics(ServerState)),
next_state(follower, State, set_tick_timer(State, [])).

leader(enter, OldState, #state{low_priority_commands = Delayed0} = State0) ->
Expand Down Expand Up @@ -568,7 +568,7 @@ leader(_, tick_timeout, State0) ->
cast, State1#state{server_state = ServerState}),
%% try sending any pending applied notifications again
State = send_applied_notifications(State2, #{}),
{keep_state, handle_tick_metrics(State),
{keep_state, State,
set_tick_timer(State, Actions)};
leader({timeout, Name}, machine_timeout, State0) ->
% the machine timer timed out, add a timeout message
Expand Down Expand Up @@ -645,7 +645,7 @@ candidate(info, {node_event, _Node, _Evt}, State) ->
{keep_state, State};
candidate(_, tick_timeout, State0) ->
State = maybe_persist_last_applied(State0),
{keep_state, handle_tick_metrics(State), set_tick_timer(State, [])};
{keep_state, State, set_tick_timer(State, [])};
candidate({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
candidate(EventType, Msg, State0) ->
Expand Down Expand Up @@ -706,7 +706,7 @@ pre_vote(info, {'DOWN', _MRef, process, Pid, Info}, State0) ->
handle_process_down(Pid, Info, ?FUNCTION_NAME, State0);
pre_vote(_, tick_timeout, State0) ->
State = maybe_persist_last_applied(State0),
{keep_state, handle_tick_metrics(State), set_tick_timer(State, [])};
{keep_state, State, set_tick_timer(State, [])};
pre_vote({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
pre_vote(EventType, Msg, State0) ->
Expand Down Expand Up @@ -849,8 +849,7 @@ follower(_, tick_timeout, #state{server_state = ServerState0} = State0) ->
ServerState = ra_server:log_tick(ServerState0),
{State, Actions} = ?HANDLE_EFFECTS([{aux, tick}], cast,
State0#state{server_state = ServerState}),
{keep_state, handle_tick_metrics(State),
set_tick_timer(State, Actions)};
{keep_state, State, set_tick_timer(State, Actions)};
follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
Expand Down Expand Up @@ -1074,7 +1073,7 @@ handle_event(_EventType, EventContent, StateName, State) ->

terminate(Reason, StateName,
#state{conf = #conf{name = Key, cluster_name = ClusterName},
server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey}}} = State) ->
server_state = ServerState} = State) ->
?DEBUG("~ts: terminating with ~w in state ~w",
[log_id(State), Reason, StateName]),
#{names := #{server_sup := SrvSup,
Expand Down Expand Up @@ -1115,7 +1114,6 @@ terminate(Reason, StateName,
ok
end,
catch ra_leaderboard:clear(ClusterName),
_ = ets:delete(ra_metrics, MetricsKey),
_ = ets:delete(ra_state, Key),
ok;
%% This occurs if there is a crash in the init callback of the ra_machine,
Expand Down Expand Up @@ -1805,11 +1803,11 @@ gen_statem_safe_call(ServerId, Msg, Timeout) ->
do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
config_defaults(ServerId, MetricLabels) ->
Counter = case ra_counters:fetch(ServerId) of
undefined ->
ra_counters:new(ServerId,
{persistent_term, ?FIELDSPEC_KEY});
{persistent_term, ?FIELDSPEC_KEY}, MetricLabels);
C ->
C
end,
Expand Down Expand Up @@ -2006,12 +2004,6 @@ get_node({_, Node}) ->
get_node(Proc) when is_atom(Proc) ->
node().

handle_tick_metrics(State) ->
ServerState = State#state.server_state,
Metrics = ra_server:metrics(ServerState),
_ = ets:insert(ra_metrics, Metrics),
State.

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
case RaftState of
Expand Down
1 change: 0 additions & 1 deletion src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ delete_server_rpc(System, RaName) ->
%% forcefully clean up ETS tables
catch ets:delete(ra_log_metrics, UId),
catch ets:delete(ra_log_snapshot_state, UId),
catch ets:delete(ra_metrics, RaName),
catch ets:delete(ra_state, RaName),
catch ets:delete(ra_open_file_metrics, Pid),
catch ra_counters:delete({RaName, node()}),
Expand Down
3 changes: 1 addition & 2 deletions src/ra_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
-export([start_link/0]).
-export([init/1]).

-define(TABLES, [ra_metrics,
ra_state,
-define(TABLES, [ra_state,
ra_open_file_metrics,
ra_io_metrics
]).
Expand Down
3 changes: 1 addition & 2 deletions test/ra_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ validate_ets_table_deletes(UIds, Pids, Peers) ->

%% validate by registered name is also cleaned up
[ [] = ets:lookup(T, Name) || {Name, _} <- Peers,
T <- [ra_metrics,
ra_state]],
T <- [ra_state]],

%% validate open file metrics is cleaned up
[ [] = ets:lookup(T, Pid) || Pid <- Pids,
Expand Down
Loading