diff --git a/c_src/riak_ensemble_clock.c b/c_src/riak_ensemble_clock.c new file mode 100644 index 0000000..de5a7d2 --- /dev/null +++ b/c_src/riak_ensemble_clock.c @@ -0,0 +1,184 @@ +/******************************************************************** + * + * Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtaine + * a copy of the License at + * + * http: www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + ********************************************************************/ +#include "erl_nif.h" + +#include +#include +#include +#include + +#if defined(__MACH__) && defined(__APPLE__) +#include +#include +#endif + +static ERL_NIF_TERM ATOM_OK; +static ERL_NIF_TERM ATOM_ERROR; + +#if defined(__MACH__) && defined(__APPLE__) +static mach_timebase_info_data_t timebase_info; +#endif + +/*********************************************************************/ + +#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0) +uint64_t posix_get_clock(clockid_t clock) +{ + struct timespec ts; + if(clock_gettime(clock, &ts) == -1) + return 0; + return ((uint64_t)ts.tv_sec * 1000000000) + ts.tv_nsec; +} + +/* Note: Prefer CLOCK_BOOTTIME on Linux where supported, as this + includes time spent in suspend. CLOCK_MONOTONIC may or may + not include time spent in suspend -- it's CPU dependent. In + practice, this shouldn't matter -- people don't typically + suspend/resume production servers while under client load. + Likewise, client TCP connections are unlikely to survive + across reasonable suspend durations. +*/ + +uint64_t posix_monotonic_time(void) +{ + uint64_t time; +#if defined(CLOCK_BOOTTIME) + if((time = posix_get_clock(CLOCK_BOOTTIME))) + return time; +#elif defined(CLOCK_MONOTONIC) + if((time = posix_get_clock(CLOCK_MONOTONIC))) + return time; +#endif + return 0; +} +#endif + +/********************************************************************* + * See Apple technical note: * + * https://developer.apple.com/library/mac/qa/qa1398/_index.html * + *********************************************************************/ + +/* Note: mach_absolute_time() is based on the CPU timestamp counter, + which is synchronized across all CPUs since Intel Nehalem. + Earlier CPUs do not provide this guarantee. It's unclear if + Apple provides any correction for this behavior on older CPUs. + We assume this doesn't matter in practice -- people don't use + ancient OS X machines as production servers. +*/ + +#if defined(__MACH__) && defined(__APPLE__) +uint64_t osx_monotonic_time(void) +{ + uint64_t time; + uint64_t timeNano; + + time = mach_absolute_time(); + + // Do the maths. We hope that the multiplication doesn't + // overflow; the price you pay for working in fixed point. + + timeNano = time * timebase_info.numer / timebase_info.denom; + + return timeNano; +} +#endif + +/*********************************************************************/ + +static uint64_t get_monotonic_time() +{ + uint64_t time = 0; + +#if defined(__MACH__) && defined(__APPLE__) + time = osx_monotonic_time(); +#endif + +#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0) + time = posix_monotonic_time(); +#endif + + return time; +} + +/*********************************************************************/ + +static ERL_NIF_TERM monotonic_time(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint64_t time = get_monotonic_time(); + + if(time) { + return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, time)); + } + else { + return ATOM_ERROR; + } +} + +/*********************************************************************/ + +static ERL_NIF_TERM monotonic_time_ms(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint64_t time = get_monotonic_time() / 1000000; + + if(time) { + return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, time)); + } + else { + return ATOM_ERROR; + } +} + +/*********************************************************************/ + +static void init(ErlNifEnv *env) +{ + ATOM_OK = enif_make_atom(env, "ok"); + ATOM_ERROR = enif_make_atom(env, "error"); + +#if defined(__MACH__) && defined(__APPLE__) + (void) mach_timebase_info(&timebase_info); +#endif +} + +static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) +{ + init(env); + return 0; +} + +static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, + ERL_NIF_TERM load_info) +{ + init(env); + return 0; +} + +static void on_unload(ErlNifEnv *env, void *priv_data) +{ +} + +/*********************************************************************/ + +static ErlNifFunc nif_funcs[] = { + {"monotonic_time", 0, monotonic_time}, + {"monotonic_time_ms", 0, monotonic_time_ms} +}; + +ERL_NIF_INIT(riak_ensemble_clock, nif_funcs, &on_load, NULL, &on_upgrade, &on_unload) diff --git a/rebar.config b/rebar.config index e3af69f..e691c5d 100644 --- a/rebar.config +++ b/rebar.config @@ -7,3 +7,9 @@ {xref_checks, [undefined_function_calls]}. {deps, [{lager, "2.0.3", {git, "git://github.com/basho/lager.git", {tag, "2.0.3"}}}, {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "develop"}}}]}. + +{port_specs, + [{".*", "priv/riak_ensemble.so", + ["c_src/*.c*"], + [{env, [{"CFLAGS", "$CFLAGS"}]}] + }]}. diff --git a/src/riak_ensemble_clock.erl b/src/riak_ensemble_clock.erl new file mode 100644 index 0000000..6bd6b4e --- /dev/null +++ b/src/riak_ensemble_clock.erl @@ -0,0 +1,42 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_ensemble_clock). +-on_load(init/0). +-export([monotonic_time/0, monotonic_time_ms/0]). + +monotonic_time() -> + erlang:nif_error({error, not_loaded}). + +monotonic_time_ms() -> + erlang:nif_error({error, not_loaded}). + +init() -> + case code:priv_dir(riak_ensemble) of + {error, bad_name} -> + case code:which(?MODULE) of + Filename when is_list(Filename) -> + SoName = filename:join([filename:dirname(Filename),"../priv", "riak_ensemble"]); + _ -> + SoName = filename:join("../priv", "riak_ensemble") + end; + Dir -> + SoName = filename:join(Dir, "riak_ensemble") + end, + erlang:load_nif(SoName, 0). diff --git a/src/riak_ensemble_config.erl b/src/riak_ensemble_config.erl index 6d2c836..76be1ff 100644 --- a/src/riak_ensemble_config.erl +++ b/src/riak_ensemble_config.erl @@ -27,11 +27,25 @@ tick() -> get_env(ensemble_tick, 500). +%% @doc +%% The leader lease duration. Should be greater than the leader tick to give +%% the leader time to refresh before expiration, but lower than the follower +%% timeout. +lease() -> + get_env(lease_duration, tick() * 2 div 3). + +%% @doc +%% This setting determines if leader leases are trusted or not. Trusting the +%% lease allows a leader to reply to reads without contacting remote peers +%% as long as its lease has not yet expired. +trust_lease() -> + get_env(trust_lease, true). + %% @doc %% The follower timeout determines how long a follower waits to hear from %% the leader before abandoning it. follower_timeout() -> - get_env(follower_timeout, tick() * 2). + get_env(follower_timeout, lease() * 4). %% @doc %% The election timeout used for randomized election. @@ -93,4 +107,3 @@ get_env(Key, Default) -> {_, Val} -> Val end. - diff --git a/src/riak_ensemble_lease.erl b/src/riak_ensemble_lease.erl new file mode 100644 index 0000000..732b092 --- /dev/null +++ b/src/riak_ensemble_lease.erl @@ -0,0 +1,145 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% This module is used by {@link riak_ensemble_peer} to keep track of +%% an established leader lease. The leader is responsible for periodically +%% refreshing its lease, otherwise the lease will timeout. +%% +%% Using a time-based lease in a distributed system is not without issue. +%% This module does its best to address these concerns as follows: +%% +%% 1. This module uses Erlang based timeouts to trigger lease expiration. +%% Erlang uses time correction to attempt to occur for clock issues, +%% as discussed here: +%% http://www.erlang.org/doc/apps/erts/time_correction.html +%% +%% 2. In addition to Erlang time, this module also double checks the +%% lease against the OS monotonic clock. The monotonic clock is +%% not affected by the user/NTP changing the system clock, and +%% is designed to always move forward (although, virtualization +%% sometimes affects this guarantee). +%% +%% Likewise, riak_ensemble is designed such that the lease and leader refresh +%% are much smaller than the follower timeout. All of these factors, along +%% with riak_ensemble being designed to maintain strong leadership (unlike +%% other systems such as Raft) make the use of leader leases safe in practice. +%% As a reminder, Google is also known to use leader leases it its paxos +%% implementation as discussed in their "Paxos Made Live" paper. +%% +%% Of course, users that do not trust leader leases can always set the +%% trust_lease application variable to false, causing riak_ensemble to ignore +%% leader leases and always perform full quorum operations. +%% + +-module(riak_ensemble_lease). + +-export([start_link/0, + check_lease/1, + lease/2, + unlease/1]). + +%% internal exports +-export([init/2, loop/2]). + +-type lease_ref() :: {pid(), ets:tid()}. +-export_type([lease_ref/0]). + +%%%=================================================================== + +-spec start_link() -> {ok, lease_ref()}. +start_link() -> + Ref = make_ref(), + spawn_link(?MODULE, init, [self(), Ref]), + receive + {Ref, Reply} -> + Reply + end. + +-spec check_lease(lease_ref()) -> boolean(). +check_lease({_, T}) -> + case ets:lookup_element(T, lease, 2) of + undefined -> + false; + Until -> + case riak_ensemble_clock:monotonic_time_ms() of + {ok, Time} when Time < Until -> + true; + _ -> + false + end + end. + +-spec lease(lease_ref(), timeout()) -> ok. +lease({Pid,_}, Duration) -> + ok = call(Pid, {lease, Duration}). + +-spec unlease(lease_ref()) -> ok. +unlease({Pid,_}) -> + ok = call(Pid, unlease). + +%%%=================================================================== + +init(Parent, Ref) -> + T = ets:new(?MODULE, [protected, set, {read_concurrency, true}]), + ets:insert(T, {lease, undefined}), + Reply = {ok, {self(), T}}, + Parent ! {Ref, Reply}, + loop(T, infinity). + +%%%=================================================================== + +loop(T, Timeout) -> + receive + {{lease, Duration}, From} -> + case riak_ensemble_clock:monotonic_time_ms() of + {ok, Time} -> + ets:insert(T, {lease, Time + Duration}); + error -> + ets:insert(T, {lease, undefined}) + end, + reply(From, ok), + ?MODULE:loop(T, Duration); + {unlease, From} -> + ets:insert(T, {lease, undefined}), + reply(From, ok), + ?MODULE:loop(T, infinity) + after Timeout -> + ets:insert(T, {lease, undefined}), + ?MODULE:loop(T, infinity) + end. + +%%%=================================================================== + +call(Pid, Msg) -> + MRef = monitor(process, Pid), + From = {self(), MRef}, + Pid ! {Msg, From}, + receive + {MRef, Reply} -> + erlang:demonitor(MRef, [flush]), + Reply; + {'DOWN', MRef, _, _, Reason} -> + exit(Reason) + end. + +reply({Pid, Ref}, Reply) -> + Pid ! {Ref, Reply}, + ok. diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 1c2f0b5..fb3728f 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -130,6 +130,7 @@ last_views :: [[peer_id()]], async :: pid(), tree :: pid(), + lease :: riak_ensemble_lease:lease_ref(), self :: pid() }). @@ -798,6 +799,14 @@ following(follower_timeout, State) -> ?OUT("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]), %% io:format("~p: follower_timeout from ~p~n", [State#state.id, leader(State)]), abandon(State#state{timer=undefined}); +following({check_epoch, Leader, Epoch, From}, State) -> + case check_epoch(Leader, Epoch, State) of + true -> + reply(From, ok, State); + false -> + reply(From, nack, State) + end, + {next_state, following, State}; following(Msg, State) -> case following_kv(Msg, State) of false -> @@ -826,6 +835,10 @@ forward(Msg, From, State) -> valid_request(Peer, ReqEpoch, State=#state{ready=Ready}) -> Ready and (ReqEpoch =:= epoch(State)) and (Peer =:= leader(State)). +-spec check_epoch(peer_id(), epoch(), state()) -> boolean(). +check_epoch(Leader, Epoch, State) -> + (Epoch =:= epoch(State)) and (Leader =:= leader(State)). + -spec increment_epoch(fact() | state()) -> {pos_integer(), fact() | state()}. increment_epoch(Fact=#fact{epoch=Epoch}) -> NextEpoch = Epoch + 1, @@ -863,8 +876,9 @@ local_commit(Fact=#fact{leader=_Leader, epoch=Epoch, seq=Seq, views=Views}, step_down(State) -> step_down(probe, State). -step_down(Next, State) -> +step_down(Next, State=#state{lease=Lease}) -> ?OUT("~p: stepping down~n", [State#state.id]), + riak_ensemble_lease:unlease(Lease), State2 = cancel_timer(State), reset_workers(State), State3 = set_leader(undefined, State2), @@ -1023,7 +1037,7 @@ nack(_Msg, _State) -> -type m_tick() :: {ok|failed|changed|shutdown, state()}. -type m_tick_fun() :: fun((state()) -> m_tick()). -leader_tick(State=#state{ensemble=Ensemble, id=Id}) -> +leader_tick(State=#state{ensemble=Ensemble, id=Id, lease=Lease}) -> State2 = mod_tick(State), M1 = {ok, State2}, M2 = continue(M1, fun maybe_ping/1), @@ -1042,6 +1056,7 @@ leader_tick(State=#state{ensemble=Ensemble, id=Id}) -> timer:sleep(1000), step_down(stop, State3); {_, State3} -> + riak_ensemble_lease:lease(Lease, riak_ensemble_config:lease()), State4 = set_timer(?ENSEMBLE_TICK, tick, State3), {next_state, leading, State4} end. @@ -1406,7 +1421,15 @@ do_get_fsm(Key, From, Self, ObjSeq, Opts, State0) -> true -> case LocalOnly of true -> - send_reply(From, {ok, Local}); + case check_lease(State) of + true -> + send_reply(From, {ok, Local}); + false -> + %% TODO: If there's a new leader, we could forward + %% instead of timeout. + send_reply(From, timeout), + gen_fsm:sync_send_event(Self, request_failed, infinity) + end; false -> case get_latest_obj(Key, Local, ObjSeq, State) of {ok, Latest, Replies, _State2} -> @@ -1431,6 +1454,31 @@ do_get_fsm(Key, From, Self, ObjSeq, Opts, State0) -> end end. +-spec check_lease(state()) -> boolean(). +check_lease(State=#state{id=Id}) -> + case valid_lease(State) of + true -> + true; + false -> + Epoch = epoch(State), + {Future, _State2} = blocking_send_all({check_epoch, Id, Epoch}, State), + case wait_for_quorum(Future) of + {quorum_met, _Replies} -> + true; + {timeout, _Replies} -> + false + end + end. + +-spec valid_lease(state()) -> boolean(). +valid_lease(#state{lease=Lease}) -> + case riak_ensemble_config:trust_lease() of + true -> + riak_ensemble_lease:check_lease(Lease); + _ -> + false + end. + maybe_repair(Key, Latest, Replies, State=#state{id=Id}) -> %% TODO: Should only send puts to peers that are actually divergent. ShouldRepair = lists:any(fun({_, nack}) -> @@ -1703,10 +1751,12 @@ setup({init, Args}, State0=#state{id=Id, ensemble=Ensemble, ets=ETS, mod=Mod}) - Saved = reload_fact(Ensemble, Id), Workers = start_workers(NumWorkers, ETS), Members = compute_members(Saved#fact.views), + {ok, Lease} = riak_ensemble_lease:start_link(), State = State0#state{workers=list_to_tuple(Workers), tree=Tree, fact=Saved, members=Members, + lease=Lease, modstate=riak_ensemble_backend:start(Mod, Ensemble, Id, Args)}, State2 = check_views(State), %% TODO: Why are we local commiting on startup? diff --git a/test/TESTS b/test/TESTS index 877de71..3f700a6 100644 --- a/test/TESTS +++ b/test/TESTS @@ -8,3 +8,5 @@ corrupt_upper_test corrupt_segment_test corrupt_exchange_test corrupt_follower_test +lease_test +ensemble_tests_pure diff --git a/test/ensemble_tests_pure.erl b/test/ensemble_tests_pure.erl new file mode 100644 index 0000000..a21e490 --- /dev/null +++ b/test/ensemble_tests_pure.erl @@ -0,0 +1,19 @@ +%% Various pure tests +-module(ensemble_tests_pure). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). + +-define(TEST(X), {timeout, 60, {test, ?MODULE, X}}). + +run_test_() -> + [?TEST(test_monotonic_time)]. + +test_monotonic_time() -> + {ok, N1} = riak_ensemble_clock:monotonic_time(), + {ok, M1} = riak_ensemble_clock:monotonic_time_ms(), + timer:sleep(1000), + {ok, N2} = riak_ensemble_clock:monotonic_time(), + {ok, M2} = riak_ensemble_clock:monotonic_time_ms(), + ?assert((N2 - N1) >= 1000000000), + ?assert((M2 - M1) >= 1000), + ok. diff --git a/test/lease_test.erl b/test/lease_test.erl new file mode 100644 index 0000000..4115368 --- /dev/null +++ b/test/lease_test.erl @@ -0,0 +1,45 @@ +-module(lease_test). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). + +run_test_() -> + ens_test:run(fun scenario/0). + +scenario() -> + ens_test:start(3), + {ok, _} = ens_test:kput(<<"test">>, <<"test">>), + + %% Test with lease trusted + {ok, _} = ens_test:kget(<<"test">>), + + %% Test with lease not trusted + ok = application:set_env(riak_ensemble, trust_lease, false), + {ok, _} = ens_test:kget(<<"test">>), + + %% Test with lease not trusted and followers intercepted to + %% nack epoch check. + rt_intercept:add(node(), {riak_ensemble_peer, [{{check_epoch,3}, check_epoch_false}]}), + {error, timeout} = ens_test:kget(<<"test">>), + + %% Test with lease trusted again + ok = application:set_env(riak_ensemble, trust_lease, true), + %% Because of error above, leader may have changed. Wait until stable. + ens_test:wait_stable(root), + %% Read twice because leader change forces first read to rewrite, which + %% ignores the lease entirely. + {ok, _} = ens_test:kget(<<"test">>), + {ok, _} = ens_test:kget(<<"test">>), + + %% Test with simulated expired lease + ok = application:set_env(riak_ensemble, follower_timeout, 1000), + ok = application:set_env(riak_ensemble, lease_duration, 0), + timer:sleep(1000), + {error, _} = ens_test:kget(<<"test">>), + + %% Remove intercept and test that all is well + rt_intercept:add(node(), {riak_ensemble_peer, [{{check_epoch,3}, check_epoch}]}), + ens_test:wait_stable(root), + {ok, _} = ens_test:kget(<<"test">>), + {ok, _} = ens_test:kget(<<"test">>), + + ok. diff --git a/test/riak_ensemble_peer_intercepts.erl b/test/riak_ensemble_peer_intercepts.erl new file mode 100644 index 0000000..d79aec6 --- /dev/null +++ b/test/riak_ensemble_peer_intercepts.erl @@ -0,0 +1,11 @@ +-module(riak_ensemble_peer_intercepts). +-compile(export_all). +-include("riak_ensemble_types.hrl"). + +-define(M, riak_ensemble_peer_orig). + +check_epoch_false(_Peer, _Epoch, _State) -> + false. + +check_epoch(Peer, Epoch, State) -> + ?M:check_epoch_orig(Peer, Epoch, State).