Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
]}.

{deps, [
{elmdb, { git, "https://github.com/twilson63/elmdb-rs.git", {branch, "main" }}},
{elmdb, { git, "https://github.com/twilson63/elmdb-rs.git", {ref, "5255868638e91b4dff24163467765d780f8a6f4a" }}},
{b64fast, {git, "https://github.com/ArweaveTeam/b64fast.git", {ref, "58f0502e49bf73b29d95c6d02460d1fb8d2a5273"}}},
{cowboy, {git, "https://github.com/ninenines/cowboy", {tag, "2.13.0"}}},
{gun, {git, "https://github.com/ninenines/gun", {tag, "2.2.0"}}},
Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
1},
{<<"elmdb">>,
{git,"https://github.com/twilson63/elmdb-rs.git",
{branch, main}},
{ref,"5255868638e91b4dff24163467765d780f8a6f4a"}},
0},
{<<"graphql">>,{pkg,<<"graphql_erl">>,<<"0.17.1">>},0},
{<<"gun">>,{pkg,<<"gun">>,<<"2.2.0">>},0},
Expand Down
23 changes: 21 additions & 2 deletions src/hb_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

-behaviour(application).

-export([start/2, stop/1]).
-export([start/2, prep_stop/1, stop/1]).

-include("include/hb.hrl").

Expand All @@ -18,5 +18,24 @@ start(_StartType, _StartArgs) ->
_TimestampServer = ar_timestamp:start(),
{ok, _} = hb_http_server:start().

prep_stop(State) ->
maybe
{ok, Opts} ?= find_lmdb_store(),
hb_store_lmdb:flush(Opts)
end,
State.

stop(_State) ->
ok.
maybe
{ok, Opts} ?= find_lmdb_store(),
hb_store_lmdb:stop(Opts)
end,
ok.

find_lmdb_store() ->
Stores = maps:get(store, hb_opts:default_message()),
Pred = fun(S) -> maps:get(<<"store-module">>, S) == hb_store_lmdb end,
case lists:search(Pred, Stores) of
{value, Opts} -> {ok, Opts};
false -> not_found
end.
2 changes: 1 addition & 1 deletion src/hb_opts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ default_message() ->
?DEFAULT_PRIMARY_STORE,
#{
<<"store-module">> => hb_store_fs,
<<"name">> => <<"cache-mainnet">>
<<"name">> => <<"cache-mainnet/fs">>
},
#{
<<"store-module">> => hb_store_gateway,
Expand Down
100 changes: 91 additions & 9 deletions src/hb_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,36 @@ list(Modules, Path) -> call_function(Modules, list, [Path]).
match(Modules, Match) -> call_function(Modules, match, [Match]).

%% @doc Copies the contents of one store to another.
sync(FromStore, ToStore) ->
sync(#{<<"store-module">> := hb_store_lmdb} = FromStore, ToStore) ->
?event({sync_start, FromStore, ToStore}),
FromStoreOpts = maps:put(<<"resolve">>, false, FromStore),
{ok, Entries} = hb_store:list(FromStore, <<"/">>),
ValidEntries = lists:filter(fun(Key) -> Key =/= <<"lmdb">> end, Entries),
case sync_entries(ValidEntries, <<"/">>, FromStoreOpts, ToStore) of
[] -> ok;
FailedKeyValues -> {error, {sync_failed, FailedKeyValues}}
Res = hb_store_lmdb:fold_while(FromStoreOpts, fun({Key, Value}, {ok, Acc}) ->
case hb_store:write(ToStore, Key, Value) of
ok ->
{cont, {ok, Acc + 1}};
Error ->
?event({sync_error, Error}),
{halt, {error, sync_failed}}
end
end, {ok, 0}),
maybe
{ok, Count} ?= Res,
?event({sync_success, Count}),
ok
end;

sync(#{<<"store-module">> := hb_store_fs} = FromStore, ToStore) ->
?event({sync_start, FromStore, ToStore}),
FromStoreOpts = maps:put(<<"resolve">>, false, FromStore),
maybe
{ok, Entries} ?= hb_store:list(FromStore, <<"/">>),
case sync_fs_entries(Entries, <<"/">>, FromStoreOpts, ToStore) of
[] -> ok;
FailedKeyValues -> {error, {sync_failed, FailedKeyValues}}
end
end.

sync_entries(Entries, ParentDir, FromStore, ToStore) ->
sync_fs_entries(Entries, ParentDir, FromStore, ToStore) ->
?event({sync_entries, ParentDir, Entries}),
lists:foldl(fun(Key, Acc) ->
NewPath =
Expand All @@ -342,7 +361,7 @@ sync_entries(Entries, ParentDir, FromStore, ToStore) ->
case hb_store:make_group(ToStore, NewPath) of
ok ->
{ok, Entries2} = hb_store:list(FromStore, NewPath),
Acc ++ sync_entries(Entries2, NewPath, FromStore, ToStore);
Acc ++ sync_fs_entries(Entries2, NewPath, FromStore, ToStore);
_Error ->
[{NewPath, undefined} | Acc]
end;
Expand Down Expand Up @@ -628,12 +647,75 @@ hb_store_sync_test(_Store) ->
hb_store:stop(FromStore),
hb_store:stop(ToStore).

%% @doc Test the hb_store:sync function by syncing from hb_store_lmdb to hb_store_lmdb
hb_store_lmdb_sync_test(_Store) ->
% Generate unique names to avoid conflicts
TestId = integer_to_binary(erlang:system_time(microsecond)),
% Set up FromStore (hb_store_lmdb) with resolve=false as specified
FromStore = #{
<<"store-module">> => hb_store_lmdb,
<<"name">> => <<"cache-lmdb-sync-from-", TestId/binary>>,
<<"resolve">> => false
},
% Set up ToStore (hb_store_lmdb)
ToStore = #{
<<"store-module">> => hb_store_lmdb,
<<"name">> => <<"cache-lmdb-sync-to-", TestId/binary>>
},

% Clean up any existing data
hb_store:reset(FromStore),
hb_store:reset(ToStore),

% Start both stores
hb_store:start(FromStore),
hb_store:start(ToStore),

% Populate FromStore with data
ok = hb_store:write(FromStore, <<"key1">>, <<"value1">>),
ok = hb_store:write(FromStore, <<"key2">>, <<"value2">>),
ok = hb_store:write(FromStore, <<"nested/key3">>, <<"value3">>),
ok = hb_store:write(FromStore, <<"deep/nested/key4">>, <<"value4">>),

% Create some links
ok = hb_store:make_link(FromStore, <<"key1">>, <<"link-to-key1">>),
ok = hb_store:make_link(FromStore, <<"nested/key3">>, <<"link-to-nested">>),

% Perform the sync operation
Result = hb_store:sync(FromStore, ToStore),
?assertEqual(ok, Result),

% Verify that all data exists in ToStore
{ok, Value1} = hb_store:read(ToStore, <<"key1">>),
?assertEqual(<<"value1">>, Value1),

{ok, Value2} = hb_store:read(ToStore, <<"key2">>),
?assertEqual(<<"value2">>, Value2),

{ok, Value3} = hb_store:read(ToStore, <<"nested/key3">>),
?assertEqual(<<"value3">>, Value3),

{ok, Value4} = hb_store:read(ToStore, <<"deep/nested/key4">>),
?assertEqual(<<"value4">>, Value4),

% Verify that links work in ToStore
{ok, LinkValue1} = hb_store:read(ToStore, <<"link-to-key1">>),
?assertEqual(<<"value1">>, LinkValue1),

{ok, LinkValue3} = hb_store:read(ToStore, <<"link-to-nested">>),
?assertEqual(<<"value3">>, LinkValue3),

% Clean up
hb_store:stop(FromStore),
hb_store:stop(ToStore).

store_suite_test_() ->
generate_test_suite([
{"simple path resolution", fun simple_path_resolution_test/1},
{"resursive path resolution", fun resursive_path_resolution_test/1},
{"hierarchical path resolution", fun hierarchical_path_resolution_test/1},
{"hb_store sync", fun hb_store_sync_test/1}
{"hb_store sync", fun hb_store_sync_test/1},
{"hb_store lmdb sync", fun hb_store_lmdb_sync_test/1}
]).

benchmark_suite_test_() ->
Expand Down
81 changes: 75 additions & 6 deletions src/hb_store_lmdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

%% Public API exports
-export([start/1, stop/1, scope/0, scope/1, reset/1]).
-export([read/2, write/3, list/2, match/2]).
-export([read/2, write/3, flush/1, list/2, match/2, fold_while/3]).
-export([make_group/2, make_link/3, type/2]).
-export([path/2, add_path/3, resolve/2]).

Expand All @@ -36,7 +36,7 @@
-define(DEFAULT_MAX_FLUSH_TIME, 50). % Maximum time between flushes
-define(MAX_REDIRECTS, 1000). % Only resolve 1000 links to data
-define(MAX_PENDING_WRITES, 400). % Force flush after x pending
-define(FOLD_YIELD_INTERVAL, 100). % Yield every x keys
-define(FOLD_CHUNK_SIZE, 100). % Iterator chunk size for folding

%% @doc Start the LMDB storage system for a given database configuration.
%%
Expand All @@ -55,13 +55,18 @@ start(Opts = #{ <<"name">> := DataDir }) ->
% Ensure the directory exists before opening LMDB environment
DataDirPath = hb_util:list(DataDir),
ok = filelib:ensure_dir(filename:join(DataDirPath, "dummy")),
NoSyncParam =
case maps:get(<<"no-sync">>, Opts, true) of
true -> [no_sync];
false -> []
end,
% Create the LMDB environment with specified size limit
{ok, Env} =
elmdb:env_open(
DataDirPath,
[
{map_size, maps:get(<<"capacity">>, Opts, ?DEFAULT_SIZE)},
no_mem_init, no_sync
no_mem_init | NoSyncParam
]
),
{ok, DBInstance} = elmdb:db_open(Env, [create]),
Expand Down Expand Up @@ -144,6 +149,22 @@ write(Opts, Path, Value) ->
retry
end.

-spec flush(map()) -> ok | {error, flush_failed}.
flush(Opts) ->
#{ <<"env">> := DBEnv } = find_env(Opts),
case elmdb:env_sync(DBEnv) of
ok -> ok;
{error, Type, Description} ->
?event(
error,
{lmdb_error,
{type, Type},
{description, Description}
}
),
{error, flush_failed}
end.

%% @doc Read a value from the database by key, with automatic link resolution.
%%
%% This function attempts to read a value directly from the committed database.
Expand Down Expand Up @@ -373,7 +394,6 @@ list(Opts, Path) ->
#{ <<"db">> := DBInstance } = find_env(Opts),
case elmdb:list(DBInstance, SearchPath) of
{ok, Children} -> {ok, Children};
{error, not_found} -> {ok, []}; % Normalize new error format
not_found -> {ok, []} % Handle both old and new format
end.

Expand All @@ -397,10 +417,45 @@ match(Opts, MatchKVs) ->
{ok, Matches} ->
?event({elmdb_matched, Matches}),
{ok, Matches};
{error, not_found} -> not_found;
not_found -> not_found
end.

-spec fold_while(Opts :: map(), Fun :: fun(), Acc0 :: term()) ->
{ok, term()} | not_found | {error, term(), binary()}.
fold_while(Opts, Fun, Acc0) ->
#{ <<"db">> := DBInstance } = find_env(Opts),
maybe
ok ?= elmdb:flush(DBInstance),
{ok, IterRes} ?= elmdb:iterate_start(DBInstance, <<>>, ?FOLD_CHUNK_SIZE),
% Links = [{Key, Value} || {Key, Value} <- KVList, type(FromStore, Key) == link],
% ?debug_print({links, Links}),
fold_chunks(DBInstance, Fun, Acc0, IterRes)
end.

fold_chunks(DBInstance, Fun, Acc0, {KVList, Continuation}) ->
case do_fold_while(Fun, Acc0, KVList) of
{cont, Acc1} ->
fold_continue(DBInstance, Fun, Acc1, Continuation);
{halt, Res} ->
Res
end.

do_fold_while(Fun, AccIn, [{Key, Value}]) ->
Fun({Key, Value}, AccIn);

do_fold_while(Fun, AccIn, [KV | KVList]) ->
maybe
{cont, AccOut} ?= Fun(KV, AccIn),
do_fold_while(Fun, AccOut, KVList)
end.

fold_continue(_DBInstance, _Fun, Acc1, not_found) ->
Acc1;
fold_continue(DBInstance, Fun, Acc1, Continuation) ->
maybe
{ok, IterRes} ?= elmdb:iterate_cont(DBInstance, Continuation, ?FOLD_CHUNK_SIZE),
fold_chunks(DBInstance, Fun, Acc1, IterRes)
end.

%% @doc Create a group entry that can contain other keys hierarchically.
%%
Expand Down Expand Up @@ -1132,4 +1187,18 @@ read_follow_test() ->
StoreOpts2 = maps:put(<<"resolve">>, false, StoreOpts),
{ok, Value3} = read(StoreOpts2, <<"HelloLink">>),
?assertEqual(Value3, <<"Hello">>),
ok = stop(StoreOpts).
ok = stop(StoreOpts).
%% @doc Test that list function resolves links correctly
flush_test() ->
StoreOpts = #{
<<"store-module">> => ?MODULE,
<<"name">> => <<"/tmp/flush">>,
<<"capacity">> => ?DEFAULT_SIZE
},
reset(StoreOpts),
write(StoreOpts, <<"key1">>, <<"value1">>),
write(StoreOpts, <<"key2">>, <<"value2">>),
write(StoreOpts, <<"key3">>, <<"value3">>),
?assertEqual(ok, flush(StoreOpts)),
?assertEqual({ok, <<"value1">>}, read(StoreOpts, <<"key1">>)),
stop(StoreOpts).