diff --git a/Makefile b/Makefile
index cd65632d..f101deb8 100644
--- a/Makefile
+++ b/Makefile
@@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli
dep_gen_batch_server = hex 0.8.9
dep_aten = hex 0.6.0
-dep_seshat = hex 0.6.0
+dep_seshat = git https://github.com/rabbitmq/seshat.git prometheus-support
DEPS = aten gen_batch_server seshat
TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy
diff --git a/README.md b/README.md
index 5b747a26..942be0f8 100644
--- a/README.md
+++ b/README.md
@@ -53,7 +53,7 @@ which algorithms such as Raft significantly benefit from.
This library was primarily developed as the foundation of a replication layer for
[quorum queues](https://rabbitmq.com/quorum-queues.html) in RabbitMQ, and today
-also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).
+also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).
The design it aims to replace uses
a variant of [Chain Based Replication](https://www.cs.cornell.edu/home/rvr/papers/OSDI04.pdf)
@@ -405,9 +405,14 @@ is available in a separate repository.
low_priority_commands_flush_size |
diff --git a/rebar.config b/rebar.config
index ddde07c9..54d53433 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,7 +1,7 @@
{deps, [
{gen_batch_server, "0.8.9"},
{aten, "0.6.0"},
- {seshat, "0.6.0"}
+ {seshat, {git, "https://github.com/rabbitmq/seshat.git", {branch, "prometheus-support"}}}
]}.
{profiles,
diff --git a/rebar.lock b/rebar.lock
index 19e233b0..6733e5bb 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -1,14 +1,11 @@
{"1.2.0",
[{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},0},
- {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.9">>},0},
- {<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},0}]}.
+ {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.9">>},0}]}.
[
{pkg_hash,[
{<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>},
- {<<"gen_batch_server">>, <<"1C6BC0F530BF8C17E8B4ACC20C2CC369FFA5BEE2B46DE01E21410745F24B1BC9">>},
- {<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]},
+ {<<"gen_batch_server">>, <<"1C6BC0F530BF8C17E8B4ACC20C2CC369FFA5BEE2B46DE01E21410745F24B1BC9">>}]},
{pkg_hash_ext,[
{<<"aten">>, <<"5F39A164206AE3F211EF5880B1F7819415686436E3229D30B6A058564FBAA168">>},
- {<<"gen_batch_server">>, <<"C8581FE4A4B6BCCF91E53CE6A8C7E6C27C8C591BAB5408B160166463F5579C22">>},
- {<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]}
+ {<<"gen_batch_server">>, <<"C8581FE4A4B6BCCF91E53CE6A8C7E6C27C8C591BAB5408B160166463F5579C22">>}]}
].
diff --git a/src/ra.hrl b/src/ra.hrl
index 986412ef..fb716916 100644
--- a/src/ra.hrl
+++ b/src/ra.hrl
@@ -389,7 +389,7 @@
"The last index of the log."},
{last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter,
"The last fully written and fsynced index of the log."},
- {commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
+ {commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, {gauge, time_ms},
"Approximate time taken from an entry being written to the log until it is committed."},
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."},
{checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter,
diff --git a/src/ra_bench.erl b/src/ra_bench.erl
index 56fff922..d4a14d8f 100644
--- a/src/ra_bench.erl
+++ b/src/ra_bench.erl
@@ -199,9 +199,8 @@ spawn_client(Parent, Leader, Num, DataSize, Pipe, Counter) ->
end
end).
-print_metrics(Name) ->
+print_metrics(_Name) ->
io:format("Node: ~w~n", [node()]),
- io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]),
io:format("counters ~p~n", [ra_counters:overview()]).
@@ -220,4 +219,3 @@ print_metrics(Name) ->
% GzFile = Base ++ ".gz.*",
% lg_callgrind:profile_many(GzFile, Base ++ ".out",#{}),
% ok.
-
diff --git a/src/ra_counters.erl b/src/ra_counters.erl
index 080b4572..38fa94d7 100644
--- a/src/ra_counters.erl
+++ b/src/ra_counters.erl
@@ -10,6 +10,7 @@
-export([
init/0,
new/2,
+ new/3,
fetch/1,
overview/0,
overview/1,
@@ -32,6 +33,9 @@ init() ->
new(Name, FieldsSpec) ->
seshat:new(ra, Name, FieldsSpec).
+new(Name, FieldsSpec, MetricLabels) ->
+ seshat:new(ra, Name, FieldsSpec, MetricLabels).
+
-spec fetch(name()) -> undefined | counters:counters_ref().
fetch(Name) ->
seshat:fetch(ra, Name).
@@ -42,11 +46,11 @@ delete(Name) ->
-spec overview() -> #{name() => #{atom() => non_neg_integer()}}.
overview() ->
- seshat:overview(ra).
+ seshat:counters(ra).
--spec overview(name()) -> #{atom() => non_neg_integer()}.
+-spec overview(name()) -> #{atom() => non_neg_integer()} | undefined.
overview(Name) ->
- seshat:overview(ra, Name).
+ seshat:counters(ra, Name).
-spec counters(name(), [atom()]) ->
#{atom() => non_neg_integer()} | undefined.
diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl
index 71df7b94..179c259a 100644
--- a/src/ra_log_segment_writer.erl
+++ b/src/ra_log_segment_writer.erl
@@ -115,7 +115,7 @@ init([#{data_dir := DataDir,
name := SegWriterName,
system := System} = Conf]) ->
process_flag(trap_exit, true),
- CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
+ CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
@@ -554,4 +554,3 @@ maybe_upgrade_segment_file_names(System, DataDir) ->
true ->
ok
end.
-
diff --git a/src/ra_log_sup.erl b/src/ra_log_sup.erl
index b2850a2f..644a9bb1 100644
--- a/src/ra_log_sup.erl
+++ b/src/ra_log_sup.erl
@@ -56,7 +56,7 @@ init([#{data_dir := DataDir,
make_wal_conf(#{data_dir := DataDir,
- name := _System,
+ name := System,
names := #{wal := WalName,
segment_writer := SegWriterName} = Names} = Cfg) ->
WalDir = case Cfg of
@@ -78,6 +78,7 @@ make_wal_conf(#{data_dir := DataDir,
?MIN_BIN_VHEAP_SIZE),
MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE),
#{name => WalName,
+ system => System,
names => Names,
dir => WalDir,
segment_writer => SegWriterName,
diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl
index 87ca98d3..96b0ecd2 100644
--- a/src/ra_log_wal.erl
+++ b/src/ra_log_wal.erl
@@ -132,6 +132,7 @@
-type state() :: #state{}.
-type wal_conf() :: #{name := atom(), %% the name to register the wal as
+ system := atom(),
names := ra_system:names(),
dir := file:filename_all(),
max_size_bytes => non_neg_integer(),
@@ -254,7 +255,7 @@ start_link(#{name := Name} = Config)
-spec init(wal_conf()) ->
{ok, state()} |
{stop, wal_checksum_validation_failure} | {stop, term()}.
-init(#{dir := Dir} = Conf0) ->
+init(#{dir := Dir, system := System} = Conf0) ->
#{max_size_bytes := MaxWalSize,
max_entries := MaxEntries,
recovery_chunk_size := RecoveryChunkSize,
@@ -278,7 +279,7 @@ init(#{dir := Dir} = Conf0) ->
process_flag(message_queue_data, off_heap),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
- CRef = ra_counters:new(WalName, ?COUNTER_FIELDS),
+ CRef = ra_counters:new(WalName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
Conf = #conf{dir = Dir,
segment_writer = SegWriter,
compute_checksums = ComputeChecksums,
diff --git a/src/ra_server.erl b/src/ra_server.erl
index 5d820d5b..7bd80fe8 100644
--- a/src/ra_server.erl
+++ b/src/ra_server.erl
@@ -30,7 +30,6 @@
tick/1,
log_tick/1,
overview/1,
- metrics/1,
is_new/1,
is_fully_persisted/1,
is_fully_replicated/1,
@@ -219,6 +218,7 @@
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
+ metrics_labels => map(),
% TODO: review - only really used for
% setting election timeouts
broadcast_time => non_neg_integer(), % ms
@@ -258,6 +258,7 @@
-type mutable_config() :: #{cluster_name => ra_cluster_name(),
metrics_key => term(),
+ metrics_labels => map(),
broadcast_time => non_neg_integer(), % ms
tick_timeout => non_neg_integer(), % ms
install_snap_rpc_timeout => non_neg_integer(), % ms
@@ -380,6 +381,7 @@ init(#{id := Id,
uid = UId,
log_id = LogId,
metrics_key = MetricKey,
+ metrics_labels = #{},
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
@@ -1793,28 +1795,6 @@ cfg_to_map(Cfg) ->
{N + 1, Acc#{F => element(N, Cfg)}}
end, {2, #{}}, record_info(fields, cfg))).
--spec metrics(ra_server_state()) ->
- {atom(), ra_term(),
- ra_index(), ra_index(),
- ra_index(), ra_index(), non_neg_integer()}.
-metrics(#{cfg := #cfg{metrics_key = Key},
- commit_index := CI,
- last_applied := LA,
- current_term := CT,
- log := Log} = State) ->
- SnapIdx = case ra_log:snapshot_index_term(Log) of
- undefined -> 0;
- {I, _} -> I
- end,
- CL = case State of
- #{commit_latency := L} ->
- L;
- _ ->
- 0
- end,
- {LW, _} = ra_log:last_index_term(Log),
- {Key, CT, SnapIdx, LA, CI, LW, CL}.
-
-spec is_new(ra_server_state()) -> boolean().
is_new(#{log := Log}) ->
ra_log:next_index(Log) =:= 1.
diff --git a/src/ra_server.hrl b/src/ra_server.hrl
index 060afa99..33381e4a 100644
--- a/src/ra_server.hrl
+++ b/src/ra_server.hrl
@@ -16,6 +16,7 @@
uid :: ra_uid(),
log_id :: unicode:chardata(),
metrics_key :: term(),
+ metrics_labels :: map(),
machine :: ra_machine:machine(),
machine_version :: ra_machine:version(),
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl
index 12eefe74..7707599c 100644
--- a/src/ra_server_proc.erl
+++ b/src/ra_server_proc.erl
@@ -316,8 +316,9 @@ do_init(#{id := Id,
Key = ra_lib:ra_server_id_to_local_name(Id),
true = ets:insert(ra_state, {Key, init, unknown}),
process_flag(trap_exit, true),
+ MetricLabels = maps:get(metrics_labels, Config0, #{}),
Config = #{counter := Counter,
- system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id),
+ system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id, MetricLabels),
Config0),
MsgQData = maps:get(message_queue_data, SysConf, off_heap),
MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf,
@@ -413,9 +414,8 @@ recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_cluster_change(State),
{keep_state, State, Actions};
-recovered(internal, next, #state{server_state = ServerState} = State) ->
+recovered(internal, next, State) ->
true = erlang:garbage_collect(),
- _ = ets:insert(ra_metrics, ra_server:metrics(ServerState)),
next_state(follower, State, set_tick_timer(State, [])).
leader(enter, OldState, #state{low_priority_commands = Delayed0} = State0) ->
@@ -568,7 +568,7 @@ leader(_, tick_timeout, State0) ->
cast, State1#state{server_state = ServerState}),
%% try sending any pending applied notifications again
State = send_applied_notifications(State2, #{}),
- {keep_state, handle_tick_metrics(State),
+ {keep_state, State,
set_tick_timer(State, Actions)};
leader({timeout, Name}, machine_timeout, State0) ->
% the machine timer timed out, add a timeout message
@@ -645,7 +645,7 @@ candidate(info, {node_event, _Node, _Evt}, State) ->
{keep_state, State};
candidate(_, tick_timeout, State0) ->
State = maybe_persist_last_applied(State0),
- {keep_state, handle_tick_metrics(State), set_tick_timer(State, [])};
+ {keep_state, State, set_tick_timer(State, [])};
candidate({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
candidate(EventType, Msg, State0) ->
@@ -706,7 +706,7 @@ pre_vote(info, {'DOWN', _MRef, process, Pid, Info}, State0) ->
handle_process_down(Pid, Info, ?FUNCTION_NAME, State0);
pre_vote(_, tick_timeout, State0) ->
State = maybe_persist_last_applied(State0),
- {keep_state, handle_tick_metrics(State), set_tick_timer(State, [])};
+ {keep_state, State, set_tick_timer(State, [])};
pre_vote({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
pre_vote(EventType, Msg, State0) ->
@@ -849,8 +849,7 @@ follower(_, tick_timeout, #state{server_state = ServerState0} = State0) ->
ServerState = ra_server:log_tick(ServerState0),
{State, Actions} = ?HANDLE_EFFECTS([{aux, tick}], cast,
State0#state{server_state = ServerState}),
- {keep_state, handle_tick_metrics(State),
- set_tick_timer(State, Actions)};
+ {keep_state, State, set_tick_timer(State, Actions)};
follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
@@ -1074,7 +1073,7 @@ handle_event(_EventType, EventContent, StateName, State) ->
terminate(Reason, StateName,
#state{conf = #conf{name = Key, cluster_name = ClusterName},
- server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey}}} = State) ->
+ server_state = ServerState} = State) ->
?DEBUG("~ts: terminating with ~w in state ~w",
[log_id(State), Reason, StateName]),
#{names := #{server_sup := SrvSup,
@@ -1115,7 +1114,6 @@ terminate(Reason, StateName,
ok
end,
catch ra_leaderboard:clear(ClusterName),
- _ = ets:delete(ra_metrics, MetricsKey),
_ = ets:delete(ra_state, Key),
ok;
%% This occurs if there is a crash in the init callback of the ra_machine,
@@ -1805,11 +1803,11 @@ gen_statem_safe_call(ServerId, Msg, Timeout) ->
do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).
-config_defaults(ServerId) ->
+config_defaults(ServerId, MetricLabels) ->
Counter = case ra_counters:fetch(ServerId) of
undefined ->
ra_counters:new(ServerId,
- {persistent_term, ?FIELDSPEC_KEY});
+ {persistent_term, ?FIELDSPEC_KEY}, MetricLabels);
C ->
C
end,
@@ -2006,12 +2004,6 @@ get_node({_, Node}) ->
get_node(Proc) when is_atom(Proc) ->
node().
-handle_tick_metrics(State) ->
- ServerState = State#state.server_state,
- Metrics = ra_server:metrics(ServerState),
- _ = ets:insert(ra_metrics, Metrics),
- State.
-
can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
case RaftState of
diff --git a/src/ra_server_sup_sup.erl b/src/ra_server_sup_sup.erl
index dcfc964e..0733f8fa 100644
--- a/src/ra_server_sup_sup.erl
+++ b/src/ra_server_sup_sup.erl
@@ -185,7 +185,6 @@ delete_server_rpc(System, RaName) ->
%% forcefully clean up ETS tables
catch ets:delete(ra_log_metrics, UId),
catch ets:delete(ra_log_snapshot_state, UId),
- catch ets:delete(ra_metrics, RaName),
catch ets:delete(ra_state, RaName),
catch ets:delete(ra_open_file_metrics, Pid),
catch ra_counters:delete({RaName, node()}),
diff --git a/src/ra_sup.erl b/src/ra_sup.erl
index 3cd63064..588c9e08 100644
--- a/src/ra_sup.erl
+++ b/src/ra_sup.erl
@@ -11,8 +11,7 @@
-export([start_link/0]).
-export([init/1]).
--define(TABLES, [ra_metrics,
- ra_state,
+-define(TABLES, [ra_state,
ra_open_file_metrics,
ra_io_metrics
]).
diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl
index 5d6a5fef..9058520a 100644
--- a/test/ra_2_SUITE.erl
+++ b/test/ra_2_SUITE.erl
@@ -277,8 +277,7 @@ validate_ets_table_deletes(UIds, Pids, Peers) ->
%% validate by registered name is also cleaned up
[ [] = ets:lookup(T, Name) || {Name, _} <- Peers,
- T <- [ra_metrics,
- ra_state]],
+ T <- [ra_state]],
%% validate open file metrics is cleaned up
[ [] = ets:lookup(T, Pid) || Pid <- Pids,
diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl
index fa3e0bf8..e298a400 100644
--- a/test/ra_SUITE.erl
+++ b/test/ra_SUITE.erl
@@ -69,7 +69,6 @@ all_tests() ->
[{repeat_until_fail, 50}]},
follower_catchup,
post_partition_liveness,
- all_metrics_are_integers,
transfer_leadership,
transfer_leadership_two_node,
new_nonvoter_knows_its_status,
@@ -692,17 +691,6 @@ consistent_query_stale(Config) ->
?assertMatch(Index, IndexAfter),
terminate_cluster(Cluster).
-all_metrics_are_integers(Config) ->
- % ok = logger:set_primary_config(level, all),
- Name = ?config(test_name, Config),
- N1 = nth_server_name(Config, 1),
- ok = ra:start_server(default, Name, N1, add_machine(), []),
- ok = ra:trigger_election(N1),
- {ok, 5, _} = ra:process_command(N1, 5, 2000),
- [{_, M1, M2, M3, M4, M5, M6}] = ets:lookup(ra_metrics, element(1, N1)),
- ?assert(lists:all(fun(I) -> is_integer(I) end, [M1, M2, M3, M4, M5, M6])),
- terminate_cluster([N1]).
-
wait_for_applied(Msg) ->
receive {ra_event, _, {applied, Applied}} ->
case lists:member(Msg, Applied) of
diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl
index a538a52a..5d2d739d 100644
--- a/test/ra_log_wal_SUITE.erl
+++ b/test/ra_log_wal_SUITE.erl
@@ -113,6 +113,7 @@ init_per_testcase(TestCase, Config) ->
Names = maps:get(names, Sys),
WalConf = #{dir => Dir,
name => ra_log_wal,
+ system => ?SYS,
names => Names,
write_strategy => G,
max_size_bytes => ?MAX_SIZE_BYTES},
@@ -1432,4 +1433,3 @@ suspend_process(Pid) ->
erlang:raise(error, internal_error, Stack)
end
end.
-
diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl
index f811fb38..b68a53dd 100644
--- a/test/ra_server_SUITE.erl
+++ b/test/ra_server_SUITE.erl
@@ -2442,6 +2442,7 @@ leader_received_append_entries_reply_with_stale_last_index(_Config) ->
uid = <<"n1">>,
log_id = <<"n1">>,
metrics_key = n1,
+ metrics_labels = #{},
machine = {machine, ra_machine_simple,
#{simple_fun => ?MACFUN,
initial_state => <<>>}}, % just keep last applied value
@@ -2500,6 +2501,7 @@ leader_receives_install_snapshot_result(_Config) ->
uid = <<"n1">>,
log_id = <<"n1">>,
metrics_key = n1,
+ metrics_labels = #{},
machine = {machine, ?FUNCTION_NAME, #{}},
machine_version = 0,
machine_versions = [{0, 0}],
@@ -3166,6 +3168,7 @@ base_state(NumServers, MacMod) ->
uid = <<"n1">>,
log_id = <<"n1">>,
metrics_key = n1,
+ metrics_labels = #{},
machine = {machine, MacMod, #{}}, % just keep last applied value
machine_version = 0,
machine_versions = [{0, 0}],
|