Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d66a102
Set rebar.config deps back to 2.0 branches
jaredmorrow Sep 4, 2014
1eb20eb
Add Apache 2 License.
Apr 2, 2015
05713b1
Avoid locking when opening a synctree leveldb
ian-mi May 27, 2015
966862f
Remove unused safe_open and reopen functions from synctree_leveldb
ian-mi May 28, 2015
c901261
Use custom script for tests.
cmeiklejohn Feb 25, 2015
7fa175b
Merge pull request #76 from basho/features/csm/test-target
cmeiklejohn Jun 10, 2015
2b10b18
Merge pull request #74 from basho/nem-add-apache-license-2.0
cmeiklejohn Jun 10, 2015
00c2e06
Merge pull request #75 from basho/nem-synctree-lock-2.0-port
cmeiklejohn Jun 10, 2015
50417bb
Update rebar.config to float on 2.1 branch of eleveldb.
Jul 2, 2015
179c772
Forward Merge branch '2.0' into dtr/forward_merge_2.0
Jul 2, 2015
bca3481
Merge pull request #78 from basho/dtr/forward_merge_2.0
borshop Jul 7, 2015
c788aa6
Update the lager version to 2.2.0
Oct 13, 2015
1a79eed
Merge pull request #90 from basho/feature/bch/upgrade-lager-to-2.2.0
borshop Oct 13, 2015
abad051
Bump eleveldb to 2.2.14 for Riak 2.2.0 release
Feb 24, 2016
871a2f8
Merge pull request #94 from basho/dr/bump_eleveldb_for_2.2
borshop Feb 24, 2016
a14b2fb
Upgrade eleveldb to 2.2.15
Mar 24, 2016
cab16cd
Merge pull request #96 from basho/bjs/eleveldb-upgrade-2.2.15
borshop Mar 25, 2016
069ab0b
Merge branch 'develop' into develop-2.2
nickelization Jun 28, 2016
38e9676
Merge pull request #105 from basho/nem-merge-develop-into-develop-2.2
borshop Jun 28, 2016
bf30938
Use native get_env/3 instead of our own version
nickelization Jun 29, 2016
7ebcb11
Only compile and export debug_local_get for tests
nickelization Jun 29, 2016
555ce3f
Switch leadership watcher cleanup to use monitors
nickelization Jun 29, 2016
c2326f9
Add wait_until function to ens_test
nickelization Jun 29, 2016
a3f0f99
Add get_watchers function for testing
nickelization Jun 29, 2016
8430868
Add leadership_watchers test
nickelization Jun 29, 2016
5b11648
Guard against the same pid watching twice
nickelization Jun 29, 2016
8cb2b00
Switch leader status watch logging to debug
nickelization Jun 29, 2016
19d83a0
Merge pull request #106 from basho/nem-post-merge-fixups
borshop Jun 30, 2016
b7fbac6
Updated rebar.config to point to eleveldb 2.2.19
fadushin Jul 5, 2016
5772022
bumped lager and eleveldb dependencies (making eleveldb dependency fl…
fadushin Jul 6, 2016
499d013
Merge pull request #108 from basho/merge-2.0-into-develop-2.2
fadushin Jul 6, 2016
fe59277
Add debug log messages for peer state transitions
nickelization Aug 24, 2016
73a362c
Merge pull request #109 from basho/nem-log-peer-state-transitions
nickelization Aug 25, 2016
ab49c39
Bumped eleveldb dependency to 2.0.28
fadushin Sep 7, 2016
89c70bf
Merge pull request #110 from basho/fd-eleveldb-dep
fadushin Sep 8, 2016
3b703fd
Bumped eleveldb to 2.0.29
fadushin Sep 15, 2016
e30f061
Merge pull request #111 from basho/fd-elevedb-bump
Sep 15, 2016
198205a
Update lager to 3.2.1
Sep 21, 2016
dc67e03
Merge pull request #112 from basho/feature-bjs-lager3.2.1
Sep 21, 2016
0f7f3d8
updated lager dependency
fadushin Sep 22, 2016
6c785cd
Updated eleveldb dependency.
fadushin Oct 11, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
{dir, "edoc"}]}.
{cover_enabled, true}.
{xref_checks, [undefined_function_calls]}.

{deps, [
{lager, "(2.0|2.1|2.2).*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
{eleveldb, "2.1.*", {git, "git://github.com/basho/eleveldb.git", {tag, "2.1.4"}}}
]}.
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "3.2.2"}}},
{eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {tag, "2.0.30"}}}
]}.

{port_specs,
[{".*", "priv/riak_ensemble.so",
Expand Down
7 changes: 1 addition & 6 deletions src/riak_ensemble_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,4 @@ notfound_read_delay() ->
get_env(notfound_read_delay, 1).

get_env(Key, Default) ->
case application:get_env(riak_ensemble, Key) of
undefined ->
Default;
{_, Val} ->
Val
end.
application:get_env(riak_ensemble, Key, Default).
130 changes: 93 additions & 37 deletions src/riak_ensemble_peer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
-export([join/2, join/3, update_members/3, get_leader/1, backend_pong/1]).
-export([kget/4, kget/5, kupdate/6, kput_once/5, kover/5, kmodify/6, kdelete/4,
ksafe_delete/5, obj_value/2, obj_value/3]).
-export([debug_local_get/2]).
-export([setup/2]).
-export([probe/2, election/2, prepare/2, leading/2, following/2,
probe/3, election/3, prepare/3, leading/3, following/3]).
Expand All @@ -45,6 +44,12 @@
get_info/1, stable_views/2, tree_info/1,
watch_leader_status/1, stop_watching/1]).

%% Backdoors for unit testing
-ifdef(TEST).
-export([debug_local_get/2]).
-export([get_watchers/1]).
-endif.

%% Exported internal callback functions
-export([do_kupdate/4, do_kput_once/4, do_kmodify/4]).

Expand Down Expand Up @@ -136,7 +141,7 @@
async :: pid(),
tree :: pid(),
lease :: riak_ensemble_lease:lease_ref(),
watchers = [] :: [pid()],
watchers = [] :: [{pid(), reference()}],
self :: pid()
}).

Expand Down Expand Up @@ -212,6 +217,12 @@ watch_leader_status(Pid) when is_pid(Pid) ->
stop_watching(Pid) when is_pid(Pid) ->
gen_fsm:send_all_state_event(Pid, {stop_watching, self()}).

-ifdef(TEST).
-spec get_watchers(pid()) -> [{pid(), reference()}].
get_watchers(Pid) when is_pid(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, get_watchers).
-endif.

get_info(Pid) when is_pid(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, get_info, infinity).

Expand Down Expand Up @@ -333,20 +344,22 @@ local_get(Pid, Key, Timeout) when is_pid(Pid) ->
local_put(Pid, Key, Obj, Timeout) when is_pid(Pid) ->
riak_ensemble_router:sync_send_event(Pid, {local_put, Key, Obj}, Timeout).

-ifdef(TEST).
%% Acts like local_get, but can be used for any peer, not just the leader.
%% Should only be used for testing purposes, since values obtained via
%% this function provide no consistency guarantees whatsoever.
-spec debug_local_get(pid(), term()) -> std_reply().
debug_local_get(Pid, Key) ->
gen_fsm:sync_send_all_state_event(Pid, {debug_local_get, Key}).
-endif.

%%%===================================================================
%%% Core Protocol
%%%===================================================================

-spec probe(_, state()) -> next_state().
probe(init, State) ->
?OUT("~p: probe~n", [State#state.id]),
lager:debug("~p: probe init", [State#state.id]),
State2 = set_leader(undefined, State),
case is_pending(State2) of
true ->
Expand Down Expand Up @@ -380,33 +393,37 @@ probe(Msg, From, State) ->
common(Msg, From, State, probe).

pending(init, State) ->
lager:debug("~p: pending init", [State#state.id]),
State2 = set_timer(?PENDING_TIMEOUT, pending_timeout, State),
{next_state, pending, State2#state{tree_trust=false}};
pending(pending_timeout, State) ->
lager:debug("~p: pending_timeout", [State#state.id]),
probe({timeout, []}, State);
pending({prepare, Id, NextEpoch, From}, State=#state{fact=Fact}) ->
Epoch = epoch(State),
case NextEpoch > Epoch of
true ->
?OUT("~p: accepting ~p from ~p (~p)~n",
[State#state.id, NextEpoch, Id, Epoch]),
lager:debug("~p: accepting ~p from ~p (~p)", [Id, NextEpoch, Id, Epoch]),
reply(From, Fact, State),
State2 = cancel_timer(State),
prefollow({init, Id, NextEpoch}, State2);
false ->
?OUT("~p: rejecting ~p from ~p (~p)~n",
[State#state.id, NextEpoch, Id, Epoch]),
lager:debug("~p: rejecting ~p from ~p (~p)", [Id, NextEpoch, Id, Epoch]),
{next_state, pending, State}
end;
pending({commit, NewFact, From}, State) ->
Epoch = epoch(State),
case NewFact#fact.epoch >= Epoch of
true ->
reply(From, ok, State),
lager:debug("~p: accepting commit from ~p for epoch ~p",
[State#state.id, From, NewFact#fact.epoch]),
State2 = local_commit(NewFact, State),
State3 = cancel_timer(State2),
following(init, State3);
false ->
lager:debug("~p: ignoring outdated commit from ~p (~p < ~p)",
[State#state.id, From, NewFact#fact.epoch, Epoch]),
{next_state, pending, State}
end;
pending(Msg, State) ->
Expand All @@ -431,6 +448,7 @@ maybe_follow(Leader, State) ->
%%%===================================================================

repair(init, State=#state{tree=Tree}) ->
lager:debug("~p: repair", [State#state.id]),
riak_ensemble_peer_tree:async_repair(Tree),
{next_state, repair, State#state{tree_trust=false}};
repair(repair_complete, State) ->
Expand Down Expand Up @@ -561,7 +579,7 @@ prefollow(Msg, From, State) ->
-spec prepare(_, state()) -> next_state().
prepare(init, State=#state{id=Id}) ->
%% TODO: Change this hack where we keep old state and reincrement
?OUT("~p: prepare~n", [State#state.id]),
lager:debug("~p: prepare", [State#state.id]),
{NextEpoch, _} = increment_epoch(State),
%% io:format("Preparing ~p to ~p :: ~p~n", [NextEpoch,
%% views(State),
Expand Down Expand Up @@ -610,7 +628,6 @@ prelead(Msg, From, State) ->

-spec leading(_, state()) -> next_state().
leading(init, State=#state{id=_Id, watchers=Watchers}) ->
?OUT("~p: Leading~n", [_Id]),
_ = lager:info("~p: Leading~n", [_Id]),
start_exchange(State),
_ = notify_leader_status(Watchers, leading, State),
Expand Down Expand Up @@ -778,7 +795,7 @@ reset_follower_timer(State) ->
following(not_ready, State) ->
following(init, State#state{ready=false});
following(init, State) ->
?OUT("~p: Following: ~p~n", [State#state.id, leader(State)]),
lager:debug("~p: Following: ~p", [State#state.id, leader(State)]),
start_exchange(State),
State2 = reset_follower_timer(State),
{next_state, following, State2};
Expand All @@ -787,6 +804,7 @@ following(exchange_complete, State) ->
State2 = State#state{tree_trust=true},
{next_state, following, State2};
following(exchange_failed, State) ->
lager:debug("~p: exchange failed", [State#state.id]),
probe(init, State);
following({commit, Fact, From}, State) ->
State3 = case Fact#fact.epoch >= epoch(State) of
Expand Down Expand Up @@ -814,8 +832,7 @@ following({commit, Fact, From}, State) ->
%% {next_state, following, State}
%% end;
following(follower_timeout, State) ->
?OUT("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]),
%% io:format("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]),
lager:debug("~p: follower_timeout from ~p", [State#state.id, leader(State)]),
abandon(State#state{timer=undefined});
following({check_epoch, Leader, Epoch, From}, State) ->
case check_epoch(Leader, Epoch, State) of
Expand Down Expand Up @@ -895,7 +912,7 @@ step_down(State) ->
step_down(probe, State).

step_down(Next, State=#state{lease=Lease, watchers=Watchers}) ->
?OUT("~p: stepping down~n", [State#state.id]),
lager:debug("~p: stepping down", [State#state.id]),
_ = notify_leader_status(Watchers, Next, State),
riak_ensemble_lease:unlease(Lease),
State2 = cancel_timer(State),
Expand Down Expand Up @@ -1004,8 +1021,6 @@ common({update_hash, _, _, MaybeFrom}, State, StateName) ->
maybe_reply(MaybeFrom, nack, State),
{next_state, StateName, State};
common(Msg, State, StateName) ->
?OUT("~p: ~s/ignoring: ~p~n", [State#state.id, StateName, Msg]),
%% io:format("~p/~p: ~s/ignoring: ~p~n", [State#state.id, self(), StateName, Msg]),
nack(Msg, State),
{next_state, StateName, State}.

Expand All @@ -1017,11 +1032,11 @@ common({force_state, {Epoch, Seq}}, From, State, StateName) ->
common(tree_pid, From, State, StateName) ->
gen_fsm:reply(From, State#state.tree),
{next_state, StateName, State};
common(tree_corrupted, From, State, _StateName) ->
common(tree_corrupted, From, State, StateName) ->
gen_fsm:reply(From, ok),
lager:debug("~p: tree_corrupted in state ~p", [State#state.id, StateName]),
repair(init, State);
common(_Msg, From, State, StateName) ->
?OUT("~p: ~s/ignoring: ~p~n", [State#state.id, StateName, _Msg]),
send_reply(From, nack),
{next_state, StateName, State}.

Expand Down Expand Up @@ -1801,7 +1816,7 @@ get_value(Obj, Default, State) ->

-spec init([any(),...]) -> {ok, setup, state()}.
init([Mod, Ensemble, Id, Args]) ->
?OUT("~p: starting~n", [Id]),
lager:debug("~p: starting peer", [Id]),
{A,B,C} = os:timestamp(),
_ = random:seed(A + erlang:phash2(Id),
B + erlang:phash2(node()),
Expand All @@ -1825,6 +1840,7 @@ init([Mod, Ensemble, Id, Args]) ->
{ok, setup, State}.

setup({init, Args}, State0=#state{id=Id, ensemble=Ensemble, ets=ETS, mod=Mod}) ->
lager:debug("~p: setup", [Id]),
NumWorkers = ?WORKERS,
{TreeId, Path} = mod_synctree(State0),
Tree = open_hashtree(Ensemble, Id, TreeId, Path),
Expand All @@ -1845,17 +1861,28 @@ setup({init, Args}, State0=#state{id=Id, ensemble=Ensemble, ets=ETS, mod=Mod}) -

-spec handle_event(_, atom(), state()) -> {next_state, atom(), state()}.
handle_event({watch_leader_status, Pid}, StateName, State) when node(Pid) =/= node() ->
lager:warning("Remote pid ~p not allowed to watch_leader_status on ensemble peer ~p",
[Pid, State#state.id]),
lager:debug("Remote pid ~p not allowed to watch_leader_status on ensemble peer ~p",
[Pid, State#state.id]),
{next_state, StateName, State};
handle_event({watch_leader_status, Pid}, StateName, State = #state{watchers = Watchers}) ->
_ = notify_leader_status(Pid, StateName, State),
%% Might as well take this opportunity to prune any dead pids that are in the list
NewWatcherList = [P || P <- [Pid | Watchers], is_process_alive(P)],
{next_state, StateName, State#state{watchers = NewWatcherList}};
case is_watcher(Pid, Watchers) of
true ->
lager:debug("Got watch_leader_status for ~p, but pid already in watchers list"),
{next_state, StateName, State};
false ->
_ = notify_leader_status(Pid, StateName, State),
MRef = erlang:monitor(process, Pid),
{next_state, StateName, State#state{watchers = [{Pid, MRef} | Watchers]}}
end;
handle_event({stop_watching, Pid}, StateName, State = #state{watchers = Watchers}) ->
NewWatcherList = lists:delete(Pid, Watchers),
{next_state, StateName, State#state{watchers = NewWatcherList}};
case remove_watcher(Pid, Watchers) of
not_found ->
lager:debug("Tried to stop watching for pid ~p, but did not find it in watcher list"),
{next_state, StateName, State};
{MRef, NewWatcherList} ->
erlang:demonitor(MRef, [flush]),
{next_state, StateName, State#state{watchers = NewWatcherList}}
end;
handle_event({reply, ReqId, Peer, Reply}, StateName, State) ->
State2 = handle_reply(ReqId, Peer, Reply, State),
{next_state, StateName, State2};
Expand Down Expand Up @@ -1884,14 +1911,32 @@ handle_sync_event(tree_info, _From, StateName, State=#state{tree_trust=Trust,
handle_sync_event({debug_local_get, Key}, From, StateName, State) ->
State2 = do_local_get(From, Key, State),
{next_state, StateName, State2};
handle_sync_event(get_watchers, _From, StateName, State) ->
{reply, State#state.watchers, StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
{reply, Reply, StateName, State}.

%% -spec handle_info(_, atom(), state()) -> next_state().
handle_info({'DOWN', Ref, _, Pid, Reason}, StateName,
#state{mod=Mod, modstate=ModState}=State) ->
case Mod:handle_down(Ref, Pid, Reason, ModState) of
handle_info({'DOWN', MRef, _, Pid, Reason}, StateName, State) ->
Watchers = State#state.watchers,
case remove_watcher(Pid, Watchers) of
{MRef, NewWatcherList} ->
{next_state, StateName, State#state{watchers = NewWatcherList}};
not_found ->
%% If the DOWN signal was not for a watcher, we must pass it through
%% to the callback module in case that's where it's supposed to go
module_handle_down(MRef, Pid, Reason, StateName, State)
end;
handle_info(quorum_timeout, StateName, State) ->
State2 = quorum_timeout(State),
{next_state, StateName, State2};
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.

module_handle_down(MRef, Pid, Reason, StateName, State) ->
#state{mod=Mod, modstate=ModState} = State,
case Mod:handle_down(MRef, Pid, Reason, ModState) of
false ->
State2 = maybe_restart_worker(Pid, State),
{next_state, StateName, State2};
Expand All @@ -1900,12 +1945,7 @@ handle_info({'DOWN', Ref, _, Pid, Reason}, StateName,
{reset, ModState2} ->
State2 = State#state{modstate=ModState2},
step_down(State2)
end;
handle_info(quorum_timeout, StateName, State) ->
State2 = quorum_timeout(State),
{next_state, StateName, State2};
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
end.

-spec terminate(_,_,_) -> ok.
terminate(_Reason, _StateName, _State) ->
Expand Down Expand Up @@ -2027,8 +2067,8 @@ existing_leader(_Replies, Abandoned, #fact{epoch=Epoch, seq=Seq, leader=Leader})
undefined
end.

notify_leader_status(PidList, StateName, State) when is_list(PidList) ->
[notify_leader_status(P, StateName, State) || P <- PidList];
notify_leader_status(WatcherList, StateName, State) when is_list(WatcherList) ->
[notify_leader_status(Pid, StateName, State) || {Pid, _MRef} <- WatcherList];
notify_leader_status(Pid, leading, State = #state{id = Id, ensemble = Ensemble}) ->
Pid ! {is_leading, self(), Id, Ensemble, epoch(State)};
notify_leader_status(Pid, _, State = #state{id = Id, ensemble = Ensemble}) ->
Expand All @@ -2052,6 +2092,22 @@ peer(Id, #state{id=Id}) ->
peer(Id, #state{ensemble=Ensemble}) ->
riak_ensemble_manager:get_peer_pid(Ensemble, Id).

is_watcher(Pid, WatcherList) ->
case lists:keyfind(Pid, 1, WatcherList) of
false ->
false;
_ ->
true
end.

remove_watcher(Pid, WatcherList) ->
case lists:keytake(Pid, 1, WatcherList) of
false ->
not_found;
{value, {_Pid, MRef}, NewWatcherList} ->
{MRef, NewWatcherList}
end.

%%%===================================================================
%%% Behaviour Interface
%%%===================================================================
Expand Down
1 change: 1 addition & 0 deletions test/TESTS
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ lease_test
ensemble_tests_pure
replace_members_test
read_tombstone_test
leadership_watchers
18 changes: 18 additions & 0 deletions test/ens_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,21 @@ read_until(Key) ->
timer:sleep(100),
read_until(Key)
end.

%% @doc Utility function used to construct test predicates. Retries the
%% function `Fun' until it returns `true', or until the maximum
%% number of retries is reached.
wait_until(Fun) when is_function(Fun) ->
wait_until(Fun, 50, 100).

wait_until(Fun, Retry, Delay) when Retry > 0 ->
Res = Fun(),
case Res of
true ->
ok;
_ when Retry == 1 ->
{fail, Res};
_ ->
timer:sleep(Delay),
wait_until(Fun, Retry-1, Delay)
end.
Loading