diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..93644eb 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -6,10 +6,12 @@ -export([stop/1]). % API. --export([start/0, stop/0, connect/1]). % Application Management. +-export([start/0, stop/0, connect/1, close_connection/1]). % Application Management. % Generic redis call --export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). +-export([q/1, q_nor/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). + +-export([get_key_slot/1]). % Specific redis command implementation -export([flushdb/0]). @@ -48,6 +50,14 @@ stop() -> connect(InitServers) -> eredis_cluster_monitor:connect(InitServers). +%% ============================================================================= +%% @doc Close the connections with a set of nodes. +%% @end +%% ============================================================================= +-spec close_connection(PoolNodes::term()) -> Result::term(). +close_connection(PoolNodes) -> + eredis_cluster_monitor:close_connection_with_pools(PoolNodes). + %% ============================================================================= %% @doc Wrapper function to execute a pipeline command as a transaction Command %% (it will add MULTI and EXEC command) @@ -160,6 +170,10 @@ qp(Commands) -> q(Commands). q(Command) -> query(Command). +%% Async variant of simple or pipelined command: +q_nor(Command) -> + query_nor(Command). + -spec qk(redis_command(), bitstring()) -> redis_result(). qk(Command, PoolKey) -> query(Command, PoolKey). @@ -168,6 +182,10 @@ query(Command) -> PoolKey = get_key_from_command(Command), query(Command, PoolKey). +query_nor(Command) -> + PoolKey = get_key_from_command(Command), + query_nor(Command, PoolKey). + query(_, undefined) -> {error, invalid_cluster_command}; query(Command, PoolKey) -> @@ -175,6 +193,13 @@ query(Command, PoolKey) -> Transaction = fun(Worker) -> qw(Worker, Command) end, query(Transaction, Slot, 0). +query_nor(_, undefined) -> + {error, invalid_cluster_command}; +query_nor(Command, PoolKey) -> + Slot = get_key_slot(PoolKey), + Transaction = fun(Worker) -> qw_nor(Worker, Command) end, + query_nor_h(Transaction, Slot). + query(_, _, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; query(Transaction, Slot, Counter) -> @@ -190,9 +215,9 @@ query(Transaction, Slot, Counter) -> end. handle_transaction_result(Result, Version) -> - case Result of + case Result of % If we detect a node went down, we should probably refresh the slot - % mapping. + % mapping. {error, no_connection} -> eredis_cluster_monitor:refresh_mapping(Version), retry; @@ -229,6 +254,11 @@ handle_transaction_result(Result, Version, check_pipeline_result) -> Payload -> Payload end. +query_nor_h(Transaction, Slot) -> + {Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot), + eredis_cluster_pool:transaction(Pool, Transaction), + ok. + -spec throttle_retries(integer()) -> ok. throttle_retries(0) -> ok; throttle_retries(_) -> timer:sleep(?REDIS_RETRY_DELAY). @@ -298,7 +328,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) -> RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]), {lists:last(RedisResult), Result} end, - case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of + case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of {{ok, undefined}, _} -> {error, resource_busy}; {{ok, TransactionResult}, UpdateResult} -> @@ -351,6 +381,10 @@ qa(Command) -> qw(Worker, Command) -> eredis_cluster_pool_worker:query(Worker, Command). +-spec qw_nor(Worker::pid(), redis_command()) -> redis_result(). +qw_nor(Worker, Command) -> + eredis_cluster_pool_worker:query_nor(Worker, Command). + %% ============================================================================= %% @doc Perform flushdb command on each node of the redis cluster %% @end diff --git a/src/eredis_cluster_monitor.erl b/src/eredis_cluster_monitor.erl index 5a74f45..f7dbbb9 100644 --- a/src/eredis_cluster_monitor.erl +++ b/src/eredis_cluster_monitor.erl @@ -3,7 +3,7 @@ %% API. -export([start_link/0]). --export([connect/1]). +-export([connect/1, close_connection_with_pools/1]). -export([refresh_mapping/1]). -export([get_state/0, get_state_version/1]). -export([get_pool_by_slot/1, get_pool_by_slot/2]). @@ -34,6 +34,9 @@ start_link() -> connect(InitServers) -> gen_server:call(?MODULE,{connect,InitServers}). +close_connection_with_pools(PoolNodes) -> + gen_server:call(?MODULE,{close_connection,PoolNodes}). + refresh_mapping(Version) -> gen_server:call(?MODULE,{reload_slots_map,Version}). @@ -66,12 +69,17 @@ get_all_pools() -> -spec get_pool_by_slot(Slot::integer(), State::#state{}) -> {PoolName::atom() | undefined, Version::integer()}. get_pool_by_slot(Slot, State) -> - Index = element(Slot+1,State#state.slots), - Cluster = element(Index,State#state.slots_maps), - if - Cluster#slots_map.node =/= undefined -> - {Cluster#slots_map.node#node.pool, State#state.version}; - true -> + try + Index = element(Slot+1,State#state.slots), + Cluster = element(Index,State#state.slots_maps), + if + Cluster#slots_map.node =/= undefined -> + {Cluster#slots_map.node#node.pool, State#state.version}; + true -> + {undefined, State#state.version} + end + catch + _:_ -> {undefined, State#state.version} end. @@ -102,6 +110,24 @@ reload_slots_map(State) -> NewState. +%%%------------------------------------------------------------ +-spec close_connection_with_nodes(SlotsMap::#slots_map{}, + Pools :: [atom()]) -> #slots_map{}. +%%% +%%% Closes the connection related to specified Pool node. +%%%------------------------------------------------------------ +close_connection_with_nodes(SlotsMaps, Pools) -> + lists:foldl(fun(Map, AccMap) -> + case lists:member(Map#slots_map.node#node.pool, + Pools) of + true -> + close_connection(Map), + AccMap; + false -> + [Map|AccMap] + end + end,[], SlotsMaps). + -spec get_cluster_slots([#node{}]) -> [[bitstring() | [bitstring()]]]. get_cluster_slots([]) -> throw({error,cannot_connect_to_cluster}); @@ -149,8 +175,6 @@ parse_cluster_slots([[StartSlot, EndSlot | [[Address, Port | _] | _]] | T], Inde parse_cluster_slots([], _Index, Acc) -> lists:reverse(Acc). - - -spec close_connection(#slots_map{}) -> ok. close_connection(SlotsMap) -> Node = SlotsMap#slots_map.node, @@ -212,6 +236,38 @@ connect_(InitNodes) -> reload_slots_map(State). +-spec close_connection_(PoolNodes :: term()) -> #state{}. +close_connection_([]) -> + case get_state() of + undefined -> + #state{}; + State -> + State + end; +close_connection_(PoolNodes) -> + State = case get_state() of + undefined -> + #state{}; + S -> + S + end, + case State#state.slots_maps of + undefined -> + State; + M -> + Map = tuple_to_list(M), + NeMap = close_connection_with_nodes(Map, PoolNodes), + ConnectedSlotsMaps = connect_all_slots(NeMap), + Slots = create_slots_cache(ConnectedSlotsMaps), + NS = State#state{ + slots = list_to_tuple(Slots), + slots_maps = list_to_tuple(ConnectedSlotsMaps), + version = State#state.version + 1 + }, + true = ets:insert(?MODULE, [{cluster_state, NS}]), + NS + end. + %% gen_server. init(_Args) -> @@ -225,6 +281,8 @@ handle_call({reload_slots_map,_}, _From, State) -> {reply, ok, State}; handle_call({connect, InitServers}, _From, _State) -> {reply, ok, connect_(InitServers)}; +handle_call({close_connection, PoolNodes}, _From, _State) -> + {reply, ok, close_connection_(PoolNodes)}; handle_call(_Request, _From, State) -> {reply, ignored, State}. diff --git a/src/eredis_cluster_pool_worker.erl b/src/eredis_cluster_pool_worker.erl index 6fd0ef0..dfb5412 100644 --- a/src/eredis_cluster_pool_worker.erl +++ b/src/eredis_cluster_pool_worker.erl @@ -4,7 +4,7 @@ %% API. -export([start_link/1]). --export([query/2]). +-export([query/2, query_nor/2]). %% gen_server. -export([init/1]). @@ -41,6 +41,9 @@ init(Args) -> query(Worker, Commands) -> gen_server:call(Worker, {'query', Commands}). +query_nor(Worker, Commands) -> + gen_server:cast(Worker, {'query_nor', Commands}). + handle_call({'query', _}, _From, #state{conn = undefined} = State) -> {reply, {error, no_connection}, State}; handle_call({'query', [[X|_]|_] = Commands}, _From, #state{conn = Conn} = State) @@ -51,6 +54,9 @@ handle_call({'query', Command}, _From, #state{conn = Conn} = State) -> handle_call(_Request, _From, State) -> {reply, ok, State}. +handle_cast({'query_nor', Command}, #state{conn = Conn} = State) -> + eredis:q_noreply(Conn, Command), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..bc62e92 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -17,6 +17,14 @@ basic_test_() -> end }, + { "get and set async", + fun() -> + ?assertEqual(ok, eredis_cluster:q_nor(["SET", "key", "value"])), + ?assertEqual({ok, <<"value">>}, eredis_cluster:q(["GET","key"])), + ?assertEqual({ok, undefined}, eredis_cluster:q(["GET","nonexists"])) + end + }, + { "binary", fun() -> ?assertEqual({ok, <<"OK">>}, eredis_cluster:q([<<"SET">>, <<"key_binary">>, <<"value_binary">>])), @@ -163,6 +171,25 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end + }, + + { "close connection", + fun () -> + Key = "close:{1}:return", + eredis_cluster:q_nor(["set", Key, "test"]), + + AllPools = lists:usort(eredis_cluster_monitor:get_all_pools()), + + Pool = element(1, eredis_cluster_monitor:get_pool_by_slot( eredis_cluster:get_key_slot(Key))), + ok = eredis_cluster:close_connection([Pool]), + + NAllPools = lists:usort(eredis_cluster_monitor:get_all_pools()), + ?assertEqual([Pool], AllPools -- NAllPools), + + ?assertEqual({ok,<<"test">>}, eredis_cluster:q(["get", Key])), + %% Slots Map has been refreshed during getting the key: + ?assertEqual(AllPools, lists:usort(eredis_cluster_monitor:get_all_pools())) + end } ]