diff --git a/rebar.config b/rebar.config index dd05411..8953fc2 100644 --- a/rebar.config +++ b/rebar.config @@ -8,11 +8,10 @@ {dir, "edoc"}]}. {cover_enabled, true}. {xref_checks, [undefined_function_calls]}. - {deps, [ - {lager, "(2.0|2.1|2.2).*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}}, - {eleveldb, "2.1.*", {git, "git://github.com/basho/eleveldb.git", {tag, "2.1.4"}}} - ]}. + {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "3.2.2"}}}, + {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {tag, "2.0.30"}}} +]}. {port_specs, [{".*", "priv/riak_ensemble.so", diff --git a/src/riak_ensemble_config.erl b/src/riak_ensemble_config.erl index 4bbf2b9..7d4aa3d 100644 --- a/src/riak_ensemble_config.erl +++ b/src/riak_ensemble_config.erl @@ -127,9 +127,4 @@ notfound_read_delay() -> get_env(notfound_read_delay, 1). get_env(Key, Default) -> - case application:get_env(riak_ensemble, Key) of - undefined -> - Default; - {_, Val} -> - Val - end. + application:get_env(riak_ensemble, Key, Default). diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 45c536b..5166413 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -30,7 +30,6 @@ -export([join/2, join/3, update_members/3, get_leader/1, backend_pong/1]). -export([kget/4, kget/5, kupdate/6, kput_once/5, kover/5, kmodify/6, kdelete/4, ksafe_delete/5, obj_value/2, obj_value/3]). --export([debug_local_get/2]). -export([setup/2]). -export([probe/2, election/2, prepare/2, leading/2, following/2, probe/3, election/3, prepare/3, leading/3, following/3]). @@ -45,6 +44,12 @@ get_info/1, stable_views/2, tree_info/1, watch_leader_status/1, stop_watching/1]). +%% Backdoors for unit testing +-ifdef(TEST). +-export([debug_local_get/2]). +-export([get_watchers/1]). +-endif. + %% Exported internal callback functions -export([do_kupdate/4, do_kput_once/4, do_kmodify/4]). @@ -136,7 +141,7 @@ async :: pid(), tree :: pid(), lease :: riak_ensemble_lease:lease_ref(), - watchers = [] :: [pid()], + watchers = [] :: [{pid(), reference()}], self :: pid() }). @@ -212,6 +217,12 @@ watch_leader_status(Pid) when is_pid(Pid) -> stop_watching(Pid) when is_pid(Pid) -> gen_fsm:send_all_state_event(Pid, {stop_watching, self()}). +-ifdef(TEST). +-spec get_watchers(pid()) -> [{pid(), reference()}]. +get_watchers(Pid) when is_pid(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, get_watchers). +-endif. + get_info(Pid) when is_pid(Pid) -> gen_fsm:sync_send_all_state_event(Pid, get_info, infinity). @@ -333,12 +344,14 @@ local_get(Pid, Key, Timeout) when is_pid(Pid) -> local_put(Pid, Key, Obj, Timeout) when is_pid(Pid) -> riak_ensemble_router:sync_send_event(Pid, {local_put, Key, Obj}, Timeout). +-ifdef(TEST). %% Acts like local_get, but can be used for any peer, not just the leader. %% Should only be used for testing purposes, since values obtained via %% this function provide no consistency guarantees whatsoever. -spec debug_local_get(pid(), term()) -> std_reply(). debug_local_get(Pid, Key) -> gen_fsm:sync_send_all_state_event(Pid, {debug_local_get, Key}). +-endif. %%%=================================================================== %%% Core Protocol @@ -346,7 +359,7 @@ debug_local_get(Pid, Key) -> -spec probe(_, state()) -> next_state(). probe(init, State) -> - ?OUT("~p: probe~n", [State#state.id]), + lager:debug("~p: probe init", [State#state.id]), State2 = set_leader(undefined, State), case is_pending(State2) of true -> @@ -380,22 +393,22 @@ probe(Msg, From, State) -> common(Msg, From, State, probe). pending(init, State) -> + lager:debug("~p: pending init", [State#state.id]), State2 = set_timer(?PENDING_TIMEOUT, pending_timeout, State), {next_state, pending, State2#state{tree_trust=false}}; pending(pending_timeout, State) -> + lager:debug("~p: pending_timeout", [State#state.id]), probe({timeout, []}, State); pending({prepare, Id, NextEpoch, From}, State=#state{fact=Fact}) -> Epoch = epoch(State), case NextEpoch > Epoch of true -> - ?OUT("~p: accepting ~p from ~p (~p)~n", - [State#state.id, NextEpoch, Id, Epoch]), + lager:debug("~p: accepting ~p from ~p (~p)", [Id, NextEpoch, Id, Epoch]), reply(From, Fact, State), State2 = cancel_timer(State), prefollow({init, Id, NextEpoch}, State2); false -> - ?OUT("~p: rejecting ~p from ~p (~p)~n", - [State#state.id, NextEpoch, Id, Epoch]), + lager:debug("~p: rejecting ~p from ~p (~p)", [Id, NextEpoch, Id, Epoch]), {next_state, pending, State} end; pending({commit, NewFact, From}, State) -> @@ -403,10 +416,14 @@ pending({commit, NewFact, From}, State) -> case NewFact#fact.epoch >= Epoch of true -> reply(From, ok, State), + lager:debug("~p: accepting commit from ~p for epoch ~p", + [State#state.id, From, NewFact#fact.epoch]), State2 = local_commit(NewFact, State), State3 = cancel_timer(State2), following(init, State3); false -> + lager:debug("~p: ignoring outdated commit from ~p (~p < ~p)", + [State#state.id, From, NewFact#fact.epoch, Epoch]), {next_state, pending, State} end; pending(Msg, State) -> @@ -431,6 +448,7 @@ maybe_follow(Leader, State) -> %%%=================================================================== repair(init, State=#state{tree=Tree}) -> + lager:debug("~p: repair", [State#state.id]), riak_ensemble_peer_tree:async_repair(Tree), {next_state, repair, State#state{tree_trust=false}}; repair(repair_complete, State) -> @@ -561,7 +579,7 @@ prefollow(Msg, From, State) -> -spec prepare(_, state()) -> next_state(). prepare(init, State=#state{id=Id}) -> %% TODO: Change this hack where we keep old state and reincrement - ?OUT("~p: prepare~n", [State#state.id]), + lager:debug("~p: prepare", [State#state.id]), {NextEpoch, _} = increment_epoch(State), %% io:format("Preparing ~p to ~p :: ~p~n", [NextEpoch, %% views(State), @@ -610,7 +628,6 @@ prelead(Msg, From, State) -> -spec leading(_, state()) -> next_state(). leading(init, State=#state{id=_Id, watchers=Watchers}) -> - ?OUT("~p: Leading~n", [_Id]), _ = lager:info("~p: Leading~n", [_Id]), start_exchange(State), _ = notify_leader_status(Watchers, leading, State), @@ -778,7 +795,7 @@ reset_follower_timer(State) -> following(not_ready, State) -> following(init, State#state{ready=false}); following(init, State) -> - ?OUT("~p: Following: ~p~n", [State#state.id, leader(State)]), + lager:debug("~p: Following: ~p", [State#state.id, leader(State)]), start_exchange(State), State2 = reset_follower_timer(State), {next_state, following, State2}; @@ -787,6 +804,7 @@ following(exchange_complete, State) -> State2 = State#state{tree_trust=true}, {next_state, following, State2}; following(exchange_failed, State) -> + lager:debug("~p: exchange failed", [State#state.id]), probe(init, State); following({commit, Fact, From}, State) -> State3 = case Fact#fact.epoch >= epoch(State) of @@ -814,8 +832,7 @@ following({commit, Fact, From}, State) -> %% {next_state, following, State} %% end; following(follower_timeout, State) -> - ?OUT("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]), - %% io:format("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]), + lager:debug("~p: follower_timeout from ~p", [State#state.id, leader(State)]), abandon(State#state{timer=undefined}); following({check_epoch, Leader, Epoch, From}, State) -> case check_epoch(Leader, Epoch, State) of @@ -895,7 +912,7 @@ step_down(State) -> step_down(probe, State). step_down(Next, State=#state{lease=Lease, watchers=Watchers}) -> - ?OUT("~p: stepping down~n", [State#state.id]), + lager:debug("~p: stepping down", [State#state.id]), _ = notify_leader_status(Watchers, Next, State), riak_ensemble_lease:unlease(Lease), State2 = cancel_timer(State), @@ -1004,8 +1021,6 @@ common({update_hash, _, _, MaybeFrom}, State, StateName) -> maybe_reply(MaybeFrom, nack, State), {next_state, StateName, State}; common(Msg, State, StateName) -> - ?OUT("~p: ~s/ignoring: ~p~n", [State#state.id, StateName, Msg]), - %% io:format("~p/~p: ~s/ignoring: ~p~n", [State#state.id, self(), StateName, Msg]), nack(Msg, State), {next_state, StateName, State}. @@ -1017,11 +1032,11 @@ common({force_state, {Epoch, Seq}}, From, State, StateName) -> common(tree_pid, From, State, StateName) -> gen_fsm:reply(From, State#state.tree), {next_state, StateName, State}; -common(tree_corrupted, From, State, _StateName) -> +common(tree_corrupted, From, State, StateName) -> gen_fsm:reply(From, ok), + lager:debug("~p: tree_corrupted in state ~p", [State#state.id, StateName]), repair(init, State); common(_Msg, From, State, StateName) -> - ?OUT("~p: ~s/ignoring: ~p~n", [State#state.id, StateName, _Msg]), send_reply(From, nack), {next_state, StateName, State}. @@ -1801,7 +1816,7 @@ get_value(Obj, Default, State) -> -spec init([any(),...]) -> {ok, setup, state()}. init([Mod, Ensemble, Id, Args]) -> - ?OUT("~p: starting~n", [Id]), + lager:debug("~p: starting peer", [Id]), {A,B,C} = os:timestamp(), _ = random:seed(A + erlang:phash2(Id), B + erlang:phash2(node()), @@ -1825,6 +1840,7 @@ init([Mod, Ensemble, Id, Args]) -> {ok, setup, State}. setup({init, Args}, State0=#state{id=Id, ensemble=Ensemble, ets=ETS, mod=Mod}) -> + lager:debug("~p: setup", [Id]), NumWorkers = ?WORKERS, {TreeId, Path} = mod_synctree(State0), Tree = open_hashtree(Ensemble, Id, TreeId, Path), @@ -1845,17 +1861,28 @@ setup({init, Args}, State0=#state{id=Id, ensemble=Ensemble, ets=ETS, mod=Mod}) - -spec handle_event(_, atom(), state()) -> {next_state, atom(), state()}. handle_event({watch_leader_status, Pid}, StateName, State) when node(Pid) =/= node() -> - lager:warning("Remote pid ~p not allowed to watch_leader_status on ensemble peer ~p", - [Pid, State#state.id]), + lager:debug("Remote pid ~p not allowed to watch_leader_status on ensemble peer ~p", + [Pid, State#state.id]), {next_state, StateName, State}; handle_event({watch_leader_status, Pid}, StateName, State = #state{watchers = Watchers}) -> - _ = notify_leader_status(Pid, StateName, State), - %% Might as well take this opportunity to prune any dead pids that are in the list - NewWatcherList = [P || P <- [Pid | Watchers], is_process_alive(P)], - {next_state, StateName, State#state{watchers = NewWatcherList}}; + case is_watcher(Pid, Watchers) of + true -> + lager:debug("Got watch_leader_status for ~p, but pid already in watchers list"), + {next_state, StateName, State}; + false -> + _ = notify_leader_status(Pid, StateName, State), + MRef = erlang:monitor(process, Pid), + {next_state, StateName, State#state{watchers = [{Pid, MRef} | Watchers]}} + end; handle_event({stop_watching, Pid}, StateName, State = #state{watchers = Watchers}) -> - NewWatcherList = lists:delete(Pid, Watchers), - {next_state, StateName, State#state{watchers = NewWatcherList}}; + case remove_watcher(Pid, Watchers) of + not_found -> + lager:debug("Tried to stop watching for pid ~p, but did not find it in watcher list"), + {next_state, StateName, State}; + {MRef, NewWatcherList} -> + erlang:demonitor(MRef, [flush]), + {next_state, StateName, State#state{watchers = NewWatcherList}} + end; handle_event({reply, ReqId, Peer, Reply}, StateName, State) -> State2 = handle_reply(ReqId, Peer, Reply, State), {next_state, StateName, State2}; @@ -1884,14 +1911,32 @@ handle_sync_event(tree_info, _From, StateName, State=#state{tree_trust=Trust, handle_sync_event({debug_local_get, Key}, From, StateName, State) -> State2 = do_local_get(From, Key, State), {next_state, StateName, State2}; +handle_sync_event(get_watchers, _From, StateName, State) -> + {reply, State#state.watchers, StateName, State}; handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. %% -spec handle_info(_, atom(), state()) -> next_state(). -handle_info({'DOWN', Ref, _, Pid, Reason}, StateName, - #state{mod=Mod, modstate=ModState}=State) -> - case Mod:handle_down(Ref, Pid, Reason, ModState) of +handle_info({'DOWN', MRef, _, Pid, Reason}, StateName, State) -> + Watchers = State#state.watchers, + case remove_watcher(Pid, Watchers) of + {MRef, NewWatcherList} -> + {next_state, StateName, State#state{watchers = NewWatcherList}}; + not_found -> + %% If the DOWN signal was not for a watcher, we must pass it through + %% to the callback module in case that's where it's supposed to go + module_handle_down(MRef, Pid, Reason, StateName, State) + end; +handle_info(quorum_timeout, StateName, State) -> + State2 = quorum_timeout(State), + {next_state, StateName, State2}; +handle_info(_Info, StateName, State) -> + {next_state, StateName, State}. + +module_handle_down(MRef, Pid, Reason, StateName, State) -> + #state{mod=Mod, modstate=ModState} = State, + case Mod:handle_down(MRef, Pid, Reason, ModState) of false -> State2 = maybe_restart_worker(Pid, State), {next_state, StateName, State2}; @@ -1900,12 +1945,7 @@ handle_info({'DOWN', Ref, _, Pid, Reason}, StateName, {reset, ModState2} -> State2 = State#state{modstate=ModState2}, step_down(State2) - end; -handle_info(quorum_timeout, StateName, State) -> - State2 = quorum_timeout(State), - {next_state, StateName, State2}; -handle_info(_Info, StateName, State) -> - {next_state, StateName, State}. + end. -spec terminate(_,_,_) -> ok. terminate(_Reason, _StateName, _State) -> @@ -2027,8 +2067,8 @@ existing_leader(_Replies, Abandoned, #fact{epoch=Epoch, seq=Seq, leader=Leader}) undefined end. -notify_leader_status(PidList, StateName, State) when is_list(PidList) -> - [notify_leader_status(P, StateName, State) || P <- PidList]; +notify_leader_status(WatcherList, StateName, State) when is_list(WatcherList) -> + [notify_leader_status(Pid, StateName, State) || {Pid, _MRef} <- WatcherList]; notify_leader_status(Pid, leading, State = #state{id = Id, ensemble = Ensemble}) -> Pid ! {is_leading, self(), Id, Ensemble, epoch(State)}; notify_leader_status(Pid, _, State = #state{id = Id, ensemble = Ensemble}) -> @@ -2052,6 +2092,22 @@ peer(Id, #state{id=Id}) -> peer(Id, #state{ensemble=Ensemble}) -> riak_ensemble_manager:get_peer_pid(Ensemble, Id). +is_watcher(Pid, WatcherList) -> + case lists:keyfind(Pid, 1, WatcherList) of + false -> + false; + _ -> + true + end. + +remove_watcher(Pid, WatcherList) -> + case lists:keytake(Pid, 1, WatcherList) of + false -> + not_found; + {value, {_Pid, MRef}, NewWatcherList} -> + {MRef, NewWatcherList} + end. + %%%=================================================================== %%% Behaviour Interface %%%=================================================================== diff --git a/test/TESTS b/test/TESTS index 0b8d1fe..2fd8ab8 100644 --- a/test/TESTS +++ b/test/TESTS @@ -13,3 +13,4 @@ lease_test ensemble_tests_pure replace_members_test read_tombstone_test +leadership_watchers diff --git a/test/ens_test.erl b/test/ens_test.erl index 312e625..76486dc 100644 --- a/test/ens_test.erl +++ b/test/ens_test.erl @@ -94,3 +94,21 @@ read_until(Key) -> timer:sleep(100), read_until(Key) end. + +%% @doc Utility function used to construct test predicates. Retries the +%% function `Fun' until it returns `true', or until the maximum +%% number of retries is reached. +wait_until(Fun) when is_function(Fun) -> + wait_until(Fun, 50, 100). + +wait_until(Fun, Retry, Delay) when Retry > 0 -> + Res = Fun(), + case Res of + true -> + ok; + _ when Retry == 1 -> + {fail, Res}; + _ -> + timer:sleep(Delay), + wait_until(Fun, Retry-1, Delay) + end. diff --git a/test/leadership_watchers.erl b/test/leadership_watchers.erl new file mode 100644 index 0000000..43733d4 --- /dev/null +++ b/test/leadership_watchers.erl @@ -0,0 +1,67 @@ +-module(leadership_watchers). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). + +run_test_() -> + ens_test:run(fun scenario/0, 40). + +scenario() -> + ens_test:start(3), + ens_test:wait_stable(root), + + Pid = riak_ensemble_manager:get_leader_pid(root), + + ?assertEqual(0, length(riak_ensemble_peer:get_watchers(Pid))), + ?debugMsg("Watching leader"), + riak_ensemble_peer:watch_leader_status(Pid), + ?assertEqual(1, length(riak_ensemble_peer:get_watchers(Pid))), + ?debugMsg("Waiting for is_leading notification"), + wait_status(is_leading, Pid), + + ?debugMsg("Stopping watching leader"), + riak_ensemble_peer:stop_watching(Pid), + ?assertEqual(0, length(riak_ensemble_peer:get_watchers(Pid))), + + ?debugMsg("Starting watching leader again"), + riak_ensemble_peer:watch_leader_status(Pid), + ?assertEqual(1, length(riak_ensemble_peer:get_watchers(Pid))), + wait_status(is_leading, Pid), + + ?debugMsg("Suspending leader, and waiting for new leader to be elected"), + erlang:suspend_process(Pid), + ens_test:wait_stable(root), + + ?debugMsg("Resuming former leader, and waiting for is_not_leading notification"), + erlang:resume_process(Pid), + wait_status(is_not_leading, Pid), + + ?debugMsg("Watching leader in external process"), + Watcher = spawn_link(fun() -> watcher(Pid) end), + wait_until_n_watchers(2, Pid), + + ?debugMsg("Killing external watcher process and checking peer state"), + Watcher ! die, + wait_until_n_watchers(1, Pid). + +wait_status(Status, Pid) -> + receive + {Status, Pid, _, _, _} -> + ok + after + 5000 -> + throw(timeout_waiting_for_leader_status) + end. + +%% Just a fun to spawn a process that will exit when we tell it to, so we +%% can test that the watcher gets removed from the ensemble peer state when +%% a watcher process dies. +watcher(PeerPid) -> + riak_ensemble_peer:watch_leader_status(PeerPid), + receive + die -> + ok + end. + +wait_until_n_watchers(N, Pid) -> + WatcherCountCheck = fun() -> N =:= length(riak_ensemble_peer:get_watchers(Pid)) end, + ?assertEqual(ok, ens_test:wait_until(WatcherCountCheck)).