diff --git a/apps/mg/src/mg_configurator.erl b/apps/mg/src/mg_configurator.erl index f77746a..581d9a1 100644 --- a/apps/mg/src/mg_configurator.erl +++ b/apps/mg/src/mg_configurator.erl @@ -42,6 +42,8 @@ -type pulse() :: mg_core_pulse:handler(). +-type scaling_opts() :: #{scaling := mg_core_cluster:scaling_type()}. + -spec construct_child_specs(config()) -> [supervisor:child_spec()]. construct_child_specs( #{ @@ -54,6 +56,7 @@ construct_child_specs( Quotas = maps:get(quotas, Config, []), HealthChecks = maps:get(health_check, Config, #{}), ClusterOpts = maps:get(cluster, Config, #{}), + Scaling = maps:get(scaling, ClusterOpts, global_based), QuotasChildSpec = quotas_child_specs(Quotas, quota), EventSinkChildSpec = event_sink_ns_child_spec(EventSinkNS, event_sink, Pulse), @@ -62,7 +65,7 @@ construct_child_specs( woody_server, #{ pulse => Pulse, - automaton => api_automaton_options(Namespaces, EventSinkNS, Pulse), + automaton => api_automaton_options(Namespaces, EventSinkNS, Pulse, #{scaling => Scaling}), event_sink => api_event_sink_options(Namespaces, EventSinkNS, Pulse), woody_server => WoodyServer, additional_routes => [ @@ -72,7 +75,7 @@ construct_child_specs( ] } ), - ClusterSpec = mg_core_union:child_spec(ClusterOpts), + ClusterSpec = mg_core_cluster:child_spec(ClusterOpts), lists:flatten([ QuotasChildSpec, @@ -146,7 +149,8 @@ machine_options(NS, Config, Pulse) -> Options = maps:with( [ retries, - timer_processing_timeout + timer_processing_timeout, + scaling ], Config ), @@ -167,8 +171,8 @@ machine_options(NS, Config, Pulse) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(namespaces(), event_sink_ns(), pulse()) -> mg_woody_automaton:options(). -api_automaton_options(NSs, EventSinkNS, Pulse) -> +-spec api_automaton_options(namespaces(), event_sink_ns(), pulse(), scaling_opts()) -> mg_woody_automaton:options(). +api_automaton_options(NSs, EventSinkNS, Pulse, ScalingOpts) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ @@ -180,7 +184,7 @@ api_automaton_options(NSs, EventSinkNS, Pulse) -> ) } end, - #{}, + ScalingOpts, NSs ). diff --git a/apps/mg/src/mg_health_check.erl b/apps/mg/src/mg_health_check.erl index af1bb60..bae5ced 100644 --- a/apps/mg/src/mg_health_check.erl +++ b/apps/mg/src/mg_health_check.erl @@ -3,6 +3,7 @@ -export([consuela/0]). -export([global/0]). -export([startup/0]). +-export([skip/0]). -spec consuela() -> {erl_health:status(), erl_health:details()}. consuela() -> @@ -13,7 +14,7 @@ consuela() -> -spec global() -> {erl_health:status(), erl_health:details()}. global() -> - ClusterSize = mg_core_union:cluster_size(), + ClusterSize = mg_core_cluster:cluster_size(), ConnectedCount = erlang:length(erlang:nodes()), case is_quorum(ClusterSize, ConnectedCount) of true -> @@ -29,6 +30,10 @@ global() -> startup() -> %% maybe any checks? logger:info("union. node ~p started", [node()]), + skip(). + +-spec skip() -> {erl_health:status(), erl_health:details()}. +skip() -> {passing, []}. %% Internal functions diff --git a/apps/mg/test/mg_prometheus_metric_SUITE.erl b/apps/mg/test/mg_prometheus_metric_SUITE.erl index b6bc886..c62f07c 100644 --- a/apps/mg/test/mg_prometheus_metric_SUITE.erl +++ b/apps/mg/test/mg_prometheus_metric_SUITE.erl @@ -841,6 +841,7 @@ mg_config() -> {namespaces, #{}}, {event_sink_ns, #{ storage => mg_core_storage_memory, + scaling => global_based, registry => mg_core_procreg_global }}, {pulse, {mg_pulse, #{}}} diff --git a/apps/mg/test/mg_tests_SUITE.erl b/apps/mg/test/mg_tests_SUITE.erl index 7f27b05..a9bec34 100644 --- a/apps/mg/test/mg_tests_SUITE.erl +++ b/apps/mg/test/mg_tests_SUITE.erl @@ -240,6 +240,7 @@ mg_config(#{endpoint := {IP, Port}}, C) -> storage => {exponential, infinity, 1, 10}, timers => {exponential, infinity, 1, 10} }, + scaling => global_based, % сейчас существуют проблемы, которые не дают включить на постоянной основе эту опцию % (а очень хочется, чтобы проверять работоспособность идемпотентных ретраев) % TODO в будущем нужно это сделать @@ -260,6 +261,7 @@ mg_config(#{endpoint := {IP, Port}}, C) -> }}, {event_sink_ns, #{ storage => mg_core_storage_memory, + scaling => global_based, default_processing_timeout => 5000 }}, {pulse, {mg_pulse, #{}}} diff --git a/apps/mg_core/src/mg_core_cluster.erl b/apps/mg_core/src/mg_core_cluster.erl new file mode 100644 index 0000000..9a2f505 --- /dev/null +++ b/apps/mg_core/src/mg_core_cluster.erl @@ -0,0 +1,342 @@ +-module(mg_core_cluster). + +-behaviour(gen_server). + +-export([start_link/1]). +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([child_spec/1]). +-export([cluster_size/0]). +-export([connecting/1]). +-export([get_node/1]). +-export([get_partitions_info/0]). + +-define(SERVER, ?MODULE). +-define(RECONNECT_TIMEOUT, 5000). + +-ifdef(TEST). +-define(NEIGHBOUR, mg_cth_neighbour). +-else. +-define(NEIGHBOUR, ?MODULE). +-endif. + +-type discovery_options() :: mg_core_cluster_partitions:discovery_options(). + +-type scaling_type() :: global_based | partition_based. + +-type cluster_options() :: #{ + discovering => discovery_options(), + reconnect_timeout => non_neg_integer(), + scaling => scaling_type(), + %% partitioning required if scaling = partition_based + partitioning => mg_core_cluster_partitions:partitions_options() +}. + +-type partitions_info() :: #{ + partitioning => mg_core_cluster_partitions:partitions_options(), + local_table => mg_core_cluster_partitions:local_partition_table(), + balancing_table => mg_core_cluster_partitions:balancing_table(), + partitions_table => mg_core_cluster_partitions:partitions_table() +}. + +-type state() :: #{ + %% cluster static options + discovering => discovery_options(), + reconnect_timeout => non_neg_integer(), + scaling => scaling_type(), + partitioning => mg_core_cluster_partitions:partitions_options(), + %% dynamic + known_nodes => [node()], + local_table => mg_core_cluster_partitions:local_partition_table(), + balancing_table => mg_core_cluster_partitions:balancing_table(), + partitions_table => mg_core_cluster_partitions:partitions_table() +}. + +-export_type([scaling_type/0]). +-export_type([partitions_info/0]). + +-spec child_spec(cluster_options()) -> [supervisor:child_spec()]. +child_spec(#{discovering := _} = ClusterOpts) -> + [ + #{ + id => ?MODULE, + start => {?MODULE, start_link, [ClusterOpts]} + } + ]; +child_spec(_) -> + % cluster not configured, skip + []. + +-spec cluster_size() -> non_neg_integer(). +cluster_size() -> + case whereis(?MODULE) of + undefined -> + %% for backward compatibility with consul + ReplicaCount = os:getenv("REPLICA_COUNT", "1"), + erlang:list_to_integer(ReplicaCount); + Pid when is_pid(Pid) -> + gen_server:call(Pid, get_cluster_size) + end. + +-spec connecting({mg_core_cluster_partitions:partitions_table(), node()}) -> + {ok, mg_core_cluster_partitions:local_partition_table()}. +connecting(RemoteData) -> + gen_server:call(?MODULE, {connecting, RemoteData}). + +-spec get_node(mg_core_cluster_partitions:balancing_key()) -> {ok, node()}. +get_node(BalancingKey) -> + gen_server:call(?MODULE, {get_node, BalancingKey}). + +-spec get_partitions_info() -> partitions_info(). +get_partitions_info() -> + gen_server:call(?MODULE, get_partitions_info). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== +-spec start_link(cluster_options()) -> {ok, pid()} | {error, term()}. +start_link(ClusterOpts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, ClusterOpts, []). + +-spec init(cluster_options()) -> {ok, state(), {continue, {full_init, cluster_options()}}}. +init(ClusterOpts) -> + logger:info("mg_cluster. init with options: ~p", [ClusterOpts]), + {ok, #{}, {continue, {full_init, ClusterOpts}}}. + +-spec handle_continue({full_init, cluster_options()}, state()) -> {noreply, state()}. +handle_continue( + { + full_init, + #{ + discovering := DiscoveryOpts, + scaling := ScalingType + } = ClusterOpts + }, + _State +) -> + _ = net_kernel:monitor_nodes(true), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + LocalTable = mg_core_cluster_partitions:make_local_table(ScalingType), + PartitionsTable = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts), LocalTable), + BalancingTable = mg_core_cluster_partitions:make_balancing_table( + PartitionsTable, + maps:get(partitioning, ClusterOpts, undefined) + ), + { + noreply, + ClusterOpts#{ + known_nodes => ListNodes, + local_table => LocalTable, + partitions_table => PartitionsTable, + balancing_table => BalancingTable + } + }. + +-spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. +handle_call({get_node, BalancingKey}, _From, State) -> + Response = mg_core_cluster_partitions:get_node(BalancingKey, partitions_info(State)), + {reply, Response, State}; +handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> + {reply, erlang:length(ListNodes), State}; +handle_call(get_partitions_info, _From, State) -> + {reply, partitions_info(State), State}; +handle_call( + {connecting, {RemoteTable, _RemoteNode}}, + _From, + #{ + scaling := partition_based, + partitioning := PartitionsOpts, + local_table := LocalTable, + partitions_table := PartitionsTable + } = State +) -> + NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table(NewPartitionsTable, PartitionsOpts), + { + reply, + {ok, LocalTable}, + State#{ + partitions_table => NewPartitionsTable, + balancing_table => NewBalancingTable + } + }; +%% Not partition based cluster, only list nodes updating +handle_call({connecting, {_RemoteTable, _RemoteNode}}, _From, State) -> + { + reply, + {ok, mg_core_cluster_partitions:empty_partitions()}, + State + }. + +-spec handle_cast(term(), state()) -> {noreply, state()}. +handle_cast(_Request, State) -> + {noreply, State}. + +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info({timeout, _TRef, {reconnect, Node}}, State) -> + NewState = maybe_connect(Node, State), + {noreply, NewState}; +handle_info({nodeup, RemoteNode}, #{discovering := DiscoveryOpts} = State) -> + %% do nothing because rebalance partitions in connecting call + logger:info("mg_cluster. ~p receive nodeup ~p", [node(), RemoteNode]), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + {noreply, State#{known_nodes => ListNodes}}; +handle_info( + {nodedown, RemoteNode}, + #{discovering := DiscoveryOpts, reconnect_timeout := Timeout, partitions_table := PartitionsTable} = State +) -> + %% rebalance without node + logger:warning("mg_cluster. ~p receive nodedown ~p", [node(), RemoteNode]), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + NewPartitionsTable = mg_core_cluster_partitions:del_partition(RemoteNode, PartitionsTable), + NewState = maybe_rebalance(#{}, State#{known_nodes => ListNodes, partitions_table => NewPartitionsTable}), + _ = erlang:start_timer(Timeout, self(), {reconnect, RemoteNode}), + {noreply, NewState}. + +-spec terminate(_Reason, state()) -> ok. +terminate(_Reason, _State) -> + ok. + +-spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% cluster functions +-spec connect(node(), non_neg_integer(), mg_core_cluster_partitions:local_partition_table()) -> + {ok, mg_core_cluster_partitions:partitions_table()} | {error, term()}. +connect(Node, ReconnectTimeout, LocalTable) when Node =/= node() -> + case net_adm:ping(Node) of + pong -> + erpc:call(Node, ?NEIGHBOUR, connecting, [{LocalTable, node()}]); + pang -> + _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), + {error, not_connected} + end; +connect(_Node, _ReconnectTimeout, LocalTable) -> + {ok, LocalTable}. + +-spec try_connect_all([node()], non_neg_integer(), mg_core_cluster_partitions:local_partition_table()) -> + mg_core_cluster_partitions:partitions_table(). +try_connect_all(ListNodes, ReconnectTimeout, LocalTable) -> + lists:foldl( + fun(Node, Acc) -> + case connect(Node, ReconnectTimeout, LocalTable) of + {ok, RemoteTable} -> + mg_core_cluster_partitions:add_partitions(Acc, RemoteTable); + _ -> + Acc + end + end, + mg_core_cluster_partitions:empty_partitions(), + ListNodes + ). + +-spec maybe_connect(node(), state()) -> state(). +maybe_connect( + Node, + #{ + discovering := Opts, + local_table := LocalTable, + reconnect_timeout := ReconnectTimeout + } = State +) -> + {ok, ListNodes} = mg_core_cluster_partitions:discovery(Opts), + case lists:member(Node, ListNodes) of + false -> + %% node delete from cluster, do nothing (rebalance was when node down detected) + State#{known_nodes => ListNodes}; + true -> + case connect(Node, ReconnectTimeout, LocalTable) of + {ok, RemoteTable} -> + %% node connected after temporary split or new node added, rebalance with node + maybe_rebalance(RemoteTable, State#{known_nodes => ListNodes}); + _ -> + State#{known_nodes => ListNodes} + end + end. + +-spec maybe_rebalance(mg_core_cluster_partitions:partitions_table(), state()) -> state(). +maybe_rebalance( + RemoteTable, + #{ + scaling := partition_based, + partitioning := PartitionsOpts, + partitions_table := PartitionsTable + } = State +) -> + NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table(NewPartitionsTable, PartitionsOpts), + State#{partitions_table => NewPartitionsTable, balancing_table => NewBalancingTable}; +maybe_rebalance(_, State) -> + State. + +-spec partitions_info(state()) -> partitions_info(). +partitions_info(State) -> + maps:with([partitioning, partitions_table, balancing_table, local_table], State). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(CLUSTER_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"localhost">>, + <<"sname">> => <<"test_node">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 3, + max_hash => 4095 + }, + reconnect_timeout => ?RECONNECT_TIMEOUT +}). + +-spec test() -> _. + +-spec child_spec_test() -> _. +child_spec_test() -> + EmptyChildSpec = mg_core_cluster:child_spec(#{}), + ?assertEqual([], EmptyChildSpec), + ExpectedSpec = [ + #{ + id => mg_core_cluster, + start => { + mg_core_cluster, + start_link, + [?CLUSTER_OPTS] + } + } + ], + ChildSpec = mg_core_cluster:child_spec(?CLUSTER_OPTS), + ?assertEqual(ExpectedSpec, ChildSpec). + +-spec maybe_connect_fail_test() -> _. +maybe_connect_fail_test() -> + St = ?CLUSTER_OPTS#{ + local_table => #{0 => node()} + }, + Expected = #{ + discovering => #{ + <<"domain_name">> => <<"localhost">>, + <<"sname">> => <<"test_node">> + }, + known_nodes => ['test_node@127.0.0.1', 'peer@127.0.0.1'], + local_table => #{0 => 'nonode@nohost'}, + partitioning => #{capacity => 3, max_hash => 4095}, + reconnect_timeout => 5000, + scaling => partition_based + }, + ?assertEqual(Expected, maybe_connect('peer@127.0.0.1', St)). + +-endif. diff --git a/apps/mg_core/src/mg_core_cluster_partitions.erl b/apps/mg_core/src/mg_core_cluster_partitions.erl new file mode 100644 index 0000000..1cb0c39 --- /dev/null +++ b/apps/mg_core/src/mg_core_cluster_partitions.erl @@ -0,0 +1,167 @@ +-module(mg_core_cluster_partitions). + +-type discovery_options() :: #{ + %% #{<<"domain_name">> => <<"machinegun-ha-headless">>,<<"sname">> => <<"machinegun">>} + binary() => binary() +}. +-type balancing_key() :: term(). +-type hash_range() :: {non_neg_integer(), non_neg_integer()}. +-type partition() :: non_neg_integer(). +-type partitions_options() :: #{ + capacity => non_neg_integer(), + max_hash => non_neg_integer() +}. +-type balancing_table() :: #{ + hash_range() => partition() +}. +-type partitions_table() :: #{ + partition() => node() +}. +%% local and remote tables contains single pair: self partition and self node +-type local_partition_table() :: partitions_table(). +-type remote_partition_table() :: partitions_table(). + +-export_type([discovery_options/0]). +-export_type([partitions_options/0]). +-export_type([balancing_key/0]). +-export_type([partition/0]). +-export_type([balancing_table/0]). +-export_type([partitions_table/0]). +-export_type([local_partition_table/0]). +-export_type([remote_partition_table/0]). + +%% API +-export([discovery/1]). +-export([make_local_table/1]). +-export([make_balancing_table/2]). +-export([add_partitions/2]). +-export([del_partition/2]). +-export([empty_partitions/0]). +-export([get_node/2]). +-export([is_local_partition/2]). + +-ifdef(TEST). +-export([get_addrs/1]). +-export([addrs_to_nodes/2]). +-export([host_to_index/1]). +-define(TEST_NODES, [ + 'test_node@127.0.0.1', + 'peer@127.0.0.1' +]). +-endif. + +-spec discovery(discovery_options()) -> {ok, [node()]}. +-ifdef(TEST). +discovery(_) -> + {ok, ?TEST_NODES}. +-else. +discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> + case get_addrs(unicode:characters_to_list(DomainName)) of + {ok, ListAddrs} -> + logger:info("mg_cluster. resolve ~p with result: ~p", [DomainName, ListAddrs]), + {ok, addrs_to_nodes(lists:uniq(ListAddrs), Sname)}; + Error -> + error({resolve_error, Error}) + end. +-endif. + +-spec make_local_table(mg_core_cluster:scaling_type()) -> local_partition_table(). +make_local_table(global_based) -> + #{}; +make_local_table(partition_based) -> + {ok, Hostname} = inet:gethostname(), + {ok, HostIndex} = host_to_index(Hostname), + #{HostIndex => node()}. + +-spec make_balancing_table(partitions_table(), partitions_options() | undefined) -> balancing_table(). +make_balancing_table(_PartitionsTable, undefined) -> + #{}; +make_balancing_table(PartitionsTable, #{capacity := Capacity, max_hash := MaxHash}) -> + ListPartitions = maps:keys(PartitionsTable), + mg_core_dirange:get_ranges(MaxHash, Capacity, ListPartitions). + +-spec get_node(balancing_key(), mg_core_cluster:partitions_info()) -> {ok, node()}. +get_node(BalancingKey, PartitionsInfo) -> + #{ + partitions_table := PartitionsTable, + balancing_table := BalancingTable, + partitioning := #{max_hash := MaxHash} + } = PartitionsInfo, + {ok, Index} = mg_core_dirange:find(erlang:phash2(BalancingKey, MaxHash), BalancingTable), + Node = maps:get(Index, PartitionsTable), + {ok, Node}. + +-spec is_local_partition(balancing_key(), mg_core_cluster:partitions_info()) -> boolean(). +is_local_partition(BalancingKey, PartitionsInfo) -> + #{ + local_table := LocalTable, + balancing_table := BalancingTable, + partitioning := #{max_hash := MaxHash} + } = PartitionsInfo, + [LocalPartition] = maps:keys(LocalTable), + {ok, LocalPartition} =:= mg_core_dirange:find(erlang:phash2(BalancingKey, MaxHash), BalancingTable). + +-spec add_partitions(partitions_table(), partitions_table()) -> partitions_table(). +add_partitions(KnownPartitions, NewPartitions) -> + maps:merge(KnownPartitions, NewPartitions). + +-spec del_partition(node(), partitions_table()) -> partitions_table(). +del_partition(Node, PartitionsTable) -> + maps:filter(fun(_Partition, NodeName) -> NodeName =/= Node end, PartitionsTable). + +-spec empty_partitions() -> partitions_table(). +empty_partitions() -> + #{}. + +% Internal functions + +-spec get_addrs(inet:hostname()) -> {ok, [inet:ip_address()]} | {error, _}. +get_addrs(DomainName) -> + case inet:getaddrs(DomainName, inet) of + {ok, _} = Ok -> Ok; + _ -> inet:getaddrs(DomainName, inet6) + end. + +-spec addrs_to_nodes([inet:ip_address()], binary()) -> [node()]. +addrs_to_nodes(ListAddrs, Sname) -> + NodeName = unicode:characters_to_list(Sname), + lists:foldl( + fun(Addr, Acc) -> + [erlang:list_to_atom(NodeName ++ "@" ++ inet:ntoa(Addr)) | Acc] + end, + [], + ListAddrs + ). + +-spec host_to_index(string()) -> {ok, non_neg_integer()} | error. +host_to_index(MaybeFqdn) -> + [Host | _] = string:split(MaybeFqdn, ".", all), + try + [_, IndexStr] = string:split(Host, "-", trailing), + {ok, erlang:list_to_integer(IndexStr)} + catch + _:_ -> + error + end. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-spec test() -> _. + +-spec get_addrs_test() -> _. +get_addrs_test() -> + {ok, [{127, 0, 0, 1} | _]} = get_addrs("localhost"), + ok. + +-spec addrs_to_nodes_test() -> _. +addrs_to_nodes_test() -> + ?assertEqual(['foo@127.0.0.1'], addrs_to_nodes([{127, 0, 0, 1}], <<"foo">>)). + +-spec host_to_index_test() -> _. +host_to_index_test() -> + ?assertEqual({ok, 0}, host_to_index("mg-0")), + ?assertEqual({ok, 1}, host_to_index("mg-1.example.com")), + ?assertEqual(error, host_to_index("ya.ru")). + +-endif. diff --git a/apps/mg_core/src/mg_core_dirange.erl b/apps/mg_core/src/mg_core_dirange.erl index 46b04f2..9202df4 100644 --- a/apps/mg_core/src/mg_core_dirange.erl +++ b/apps/mg_core/src/mg_core_dirange.erl @@ -41,6 +41,11 @@ -export([from/1]). -export([to/1]). +-export([ + get_ranges/3, + find/2 +]). + %% Directed range over integers -type dirange(_T) :: nonempty_dirange(_T) | undefined. -type direction() :: -1 | +1. @@ -48,6 +53,14 @@ % Non-empty, unambiguously oriented directed range [from..to]. {_T :: integer(), _T :: integer(), direction()}. +-type max_value() :: non_neg_integer(). +-type capacity() :: non_neg_integer(). +-type section_number() :: non_neg_integer(). +-type alive_sections() :: [section_number()]. +-type min_range_value() :: non_neg_integer(). +-type max_range_value() :: non_neg_integer(). +-type range_map() :: #{{min_range_value(), max_range_value()} => section_number()}. + %% -spec empty() -> dirange(_). @@ -204,3 +217,132 @@ to(undefined) -> undefined; to({_, B, _}) -> B. + +-spec get_ranges(max_value(), capacity(), alive_sections()) -> range_map(). +get_ranges(MaxValue, Capacity, AliveList) -> + AliveSize = erlang:length(AliveList), + AliveListSorted = lists:sort(AliveList), + FullList = lists:seq(0, Capacity - 1), + DeadList = lists:filter(fun(E) -> not lists:member(E, AliveList) end, FullList), + BaseRangeMap = distribute({0, MaxValue}, Capacity, FullList), + redistribute(BaseRangeMap, AliveSize, AliveListSorted, DeadList). + +-spec find(non_neg_integer(), range_map()) -> {ok, section_number()} | none. +find(Value, RangeMap) -> + Iterator = maps:iterator(RangeMap), + do_find(maps:next(Iterator), Value). + +%% Internal functions + +-spec do_find(none | {{min_range_value(), max_range_value()}, section_number(), maps:iterator()}, non_neg_integer()) -> + {ok, section_number()} | none. +do_find(none, _) -> + none; +do_find({Range, Num, Iterator}, Value) -> + case in_range(Value, Range) of + true -> {ok, Num}; + false -> do_find(maps:next(Iterator), Value) + end. + +-spec in_range(non_neg_integer(), {min_range_value(), max_range_value()}) -> boolean(). +in_range(Value, {Min, Max}) when Value >= Min andalso Value =< Max -> true; +in_range(_, _) -> false. + +-spec distribute({min_range_value(), max_range_value()}, non_neg_integer(), [section_number()]) -> + range_map(). +distribute(Range, Size, ListSorted) -> + distribute(Range, Size, ListSorted, #{}). + +-spec distribute({min_range_value(), max_range_value()}, non_neg_integer(), [section_number()], range_map()) -> + range_map(). +distribute({Min, Max}, Size, ListSorted, Acc) -> + Delta = not_zero(((Max - Min) div Size) - 1), + SizeFromZero = Size - 1, + {_, Result} = lists:foldl( + fun + (Num, {StartPos, Map}) when Num =:= SizeFromZero -> + %% because lists indexed from 1 + {Max, Map#{{StartPos, Max} => lists:nth(Num + 1, ListSorted)}}; + (Num, {StartPos, Map}) -> + MaxVal = StartPos + Delta, + {MaxVal + 1, Map#{{StartPos, MaxVal} => lists:nth(Num + 1, ListSorted)}} + end, + {Min, Acc}, + lists:seq(0, SizeFromZero) + ), + Result. + +-spec redistribute(range_map(), non_neg_integer(), [section_number()], [section_number()]) -> range_map(). +redistribute(BaseRangeMap, AliveSize, AliveListSorted, DeadList) -> + maps:fold( + fun(Range, RangeNum, Acc) -> + case lists:member(RangeNum, DeadList) of + true -> + distribute(Range, AliveSize, AliveListSorted, Acc); + false -> + Acc#{Range => RangeNum} + end + end, + #{}, + BaseRangeMap + ). + +-spec not_zero(non_neg_integer()) -> non_neg_integer(). +not_zero(0) -> 1; +not_zero(Value) -> Value. + +%% Tests + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-spec test() -> _. + +-spec without_dead_test() -> _. +without_dead_test() -> + ?assertEqual( + #{{0, 3} => 0, {4, 7} => 1, {8, 11} => 2, {12, 16} => 3}, + get_ranges(16, 4, [0, 1, 2, 3]) + ), + ?assertEqual( + #{{0, 3} => 0, {4, 7} => 1, {8, 11} => 2, {12, 17} => 3}, + get_ranges(17, 4, [0, 1, 2, 3]) + ), + ?assertEqual( + #{{0, 4} => 0, {5, 9} => 1, {10, 14} => 2, {15, 21} => 3}, + get_ranges(21, 4, [0, 1, 2, 3]) + ). + +-spec with_dead_test() -> _. +with_dead_test() -> + ?assertEqual( + #{ + {0, 3} => 0, + {4, 5} => 0, + {6, 7} => 2, + {8, 7} => 3, + {8, 11} => 2, + {12, 16} => 3 + }, + get_ranges(16, 4, [0, 2, 3]) + ), + ?assertEqual( + #{ + {0, 3} => 0, + {4, 5} => 0, + {6, 7} => 2, + {8, 11} => 2, + {12, 13} => 0, + {14, 16} => 2 + }, + get_ranges(16, 4, [0, 2]) + ). + +-spec find_test() -> _. +find_test() -> + RangeMap = get_ranges(16, 4, [0, 2]), + ?assertEqual({ok, 0}, find(5, RangeMap)), + ?assertEqual({ok, 2}, find(10, RangeMap)), + ?assertEqual(none, find(100, RangeMap)). + +-endif. diff --git a/apps/mg_core/src/mg_core_events_sink_machine.erl b/apps/mg_core/src/mg_core_events_sink_machine.erl index 87f8be1..dc2920b 100644 --- a/apps/mg_core/src/mg_core_events_sink_machine.erl +++ b/apps/mg_core/src/mg_core_events_sink_machine.erl @@ -51,7 +51,8 @@ worker := mg_core_workers_manager:ns_options(), pulse := mg_core_pulse:handler(), events_storage := mg_core_storage:options(), - default_processing_timeout := timeout() + default_processing_timeout := timeout(), + scaling => mg_core_cluster:scaling_type() }. -type ns_options() :: #{ namespace := mg_core:ns(), @@ -59,7 +60,8 @@ worker := mg_core_workers_manager:ns_options(), pulse := mg_core_pulse:handler(), events_storage := storage_options(), - default_processing_timeout := timeout() + default_processing_timeout := timeout(), + scaling := mg_core_cluster:scaling_type() }. % like mg_core_storage:options() except `name` -type storage_options() :: mg_core_utils:mod_opts(map()). @@ -222,7 +224,7 @@ new_state() -> -spec machine_options(ns_options()) -> mg_core_machine:options(). machine_options( Options = #{ - namespace := Namespace, storage := Storage, worker := Worker, pulse := Pulse + namespace := Namespace, storage := Storage, worker := Worker, pulse := Pulse, scaling := Scaling } ) -> #{ @@ -230,7 +232,8 @@ machine_options( processor => {?MODULE, Options}, storage => Storage, worker => Worker, - pulse => Pulse + pulse => Pulse, + scaling => Scaling }. -spec events_storage_options(ns_options()) -> mg_core_storage:options(). diff --git a/apps/mg_core/src/mg_core_machine.erl b/apps/mg_core/src/mg_core_machine.erl index ef62290..b9d8316 100644 --- a/apps/mg_core/src/mg_core_machine.erl +++ b/apps/mg_core/src/mg_core_machine.erl @@ -162,6 +162,7 @@ -type options() :: #{ namespace := mg_core:ns(), pulse := mg_core_pulse:handler(), + scaling := mg_core_cluster:scaling_type(), storage => storage_options(), notification => mg_core_notification:options(), processor => mg_core_utils:mod_opts(), @@ -1361,9 +1362,9 @@ manager_options(Options = #{namespace := NS, worker := ManagerOptions, pulse := }. -spec storage_options(options()) -> mg_core_storage:options(). -storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler}) -> +storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler, scaling := Scaling}) -> {Mod, Options} = mg_core_utils:separate_mod_opts(StorageOptions, #{}), - {Mod, Options#{name => {NS, ?MODULE, machines}, pulse => Handler}}. + {Mod, Options#{name => {NS, ?MODULE, machines}, pulse => Handler, scaling => Scaling}}. -spec notification_options(options()) -> mg_core_notification:options(). notification_options(#{notification := NotificationOptions}) -> @@ -1446,7 +1447,8 @@ scheduler_options(HandlerMod, Options, HandlerOptions, Config) -> max_scan_limit => maps:get(max_scan_limit, Config, undefined), scan_ahead => maps:get(scan_ahead, Config, undefined), task_handler => Handler, - pulse => Pulse + pulse => Pulse, + scaling => maps:get(scaling, Options, global_based) }). -spec scheduler_cutoff(scheduler_opt()) -> seconds(). diff --git a/apps/mg_core/src/mg_core_notification.erl b/apps/mg_core/src/mg_core_notification.erl index 63e4dfe..8c3b202 100644 --- a/apps/mg_core/src/mg_core_notification.erl +++ b/apps/mg_core/src/mg_core_notification.erl @@ -18,7 +18,8 @@ -type options() :: #{ namespace := mg_core:ns(), pulse := mg_core_pulse:handler(), - storage := storage_options() + storage := storage_options(), + scaling => mg_core_cluster:scaling_type() }. -export_type([id/0]). @@ -123,6 +124,7 @@ data_to_opaque(#{ %% -spec storage_options(options()) -> mg_core_storage:options(). -storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler}) -> +storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler} = Opts) -> + Scaling = maps:get(scaling, Opts, global_based), {Mod, Options} = mg_core_utils:separate_mod_opts(StorageOptions, #{}), - {Mod, Options#{name => {NS, ?MODULE, notifications}, pulse => Handler}}. + {Mod, Options#{name => {NS, ?MODULE, notifications}, pulse => Handler, scaling => Scaling}}. diff --git a/apps/mg_core/src/mg_core_queue_notifications.erl b/apps/mg_core/src/mg_core_queue_notifications.erl index 2359e96..04270d2 100644 --- a/apps/mg_core/src/mg_core_queue_notifications.erl +++ b/apps/mg_core/src/mg_core_queue_notifications.erl @@ -44,7 +44,8 @@ rescan_delay => milliseconds(), scan_handicap => seconds(), scan_cutoff => seconds(), - reschedule_time => seconds() + reschedule_time => seconds(), + scaling => mg_core_cluster:scaling_type() }. -record(state, { @@ -176,8 +177,9 @@ machine_options(#{machine := MachineOptions}) -> MachineOptions. -spec notification_options(options()) -> mg_core_notification:options(). -notification_options(#{notification := NotificationOptions}) -> - NotificationOptions. +notification_options(#{notification := NotificationOptions, machine := Machine}) -> + Scaling = maps:get(scaling, Machine, global_based), + NotificationOptions#{scaling => Scaling}. -spec create_task(options(), mg_core_notification:id(), target_time()) -> task(). create_task(Options, NotificationID, Timestamp) -> diff --git a/apps/mg_core/src/mg_core_queue_scanner.erl b/apps/mg_core/src/mg_core_queue_scanner.erl index a81084b..e95d179 100644 --- a/apps/mg_core/src/mg_core_queue_scanner.erl +++ b/apps/mg_core/src/mg_core_queue_scanner.erl @@ -40,7 +40,8 @@ scan_ahead => scan_ahead(), retry_scan_delay => scan_delay(), squad_opts => mg_core_gen_squad:opts(), - pulse => mg_core_pulse:handler() + pulse => mg_core_pulse:handler(), + scaling => mg_core_cluster:scaling_type() }. -export_type([options/0]). @@ -150,7 +151,8 @@ where_is(SchedulerID) -> scan_ahead :: scan_ahead(), retry_delay :: scan_delay(), timer :: reference() | undefined, - pulse :: mg_core_pulse:handler() | undefined + pulse :: mg_core_pulse:handler() | undefined, + scaling :: mg_core_cluster:scaling_type() }). -type st() :: #st{}. @@ -166,10 +168,13 @@ init({SchedulerID, Options}) -> max_limit = maps:get(max_scan_limit, Options, ?DEFAULT_MAX_LIMIT), scan_ahead = maps:get(scan_ahead, Options, ?DEFAULT_SCAN_AHEAD), retry_delay = maps:get(retry_scan_delay, Options, ?DEFAULT_RETRY_SCAN_DELAY), - pulse = maps:get(pulse, Options, undefined) + pulse = maps:get(pulse, Options, undefined), + scaling = maps:get(scaling, Options, global_based) }}. -spec discover(st()) -> {ok, [pid()], st()}. +discover(St = #st{scaling = partition_based}) -> + {ok, [], St}; discover(St = #st{scheduler_id = SchedulerID}) -> Nodes = erlang:nodes(), Pids = multicall(Nodes, ?MODULE, where_is, [SchedulerID], ?DISCOVER_TIMEOUT), diff --git a/apps/mg_core/src/mg_core_scheduler_sup.erl b/apps/mg_core/src/mg_core_scheduler_sup.erl index fab2d71..1731592 100644 --- a/apps/mg_core/src/mg_core_scheduler_sup.erl +++ b/apps/mg_core/src/mg_core_scheduler_sup.erl @@ -30,6 +30,7 @@ scan_ahead => mg_core_queue_scanner:scan_ahead(), retry_scan_delay => mg_core_queue_scanner:scan_delay(), squad_opts => mg_core_gen_squad:opts(), + scaling => mg_core_cluster:scaling_type(), % workers task_handler := mg_core_utils:mod_opts(), % common @@ -59,7 +60,7 @@ start_link(SchedulerID, Options) -> Options ), ScannerOptions = maps:with( - [queue_handler, max_scan_limit, scan_ahead, retry_scan_delay, squad_opts, pulse], + [queue_handler, max_scan_limit, scan_ahead, retry_scan_delay, squad_opts, pulse, scaling], Options ), WorkerOptions = maps:with( diff --git a/apps/mg_core/src/mg_core_storage.erl b/apps/mg_core/src/mg_core_storage.erl index 9d1bf7c..698e60d 100644 --- a/apps/mg_core/src/mg_core_storage.erl +++ b/apps/mg_core/src/mg_core_storage.erl @@ -175,6 +175,8 @@ get(Options, Key) -> do_request(Options, {get, Key}). -spec search(options(), index_query()) -> search_result(). +search({_, #{scaling := partition_based, name := Name}} = Options, Query) -> + filter_by_partition(Name, do_request(Options, {search, Query})); search(Options, Query) -> do_request(Options, {search, Query}). @@ -283,6 +285,28 @@ sidecar_child_spec(Options, ChildID) -> undefined end. +-spec filter_by_partition(name(), search_result()) -> search_result(). +filter_by_partition({NS, _, _}, Data) when is_list(Data) -> + PartitionsInfo = mg_core_cluster:get_partitions_info(), + lists:filter( + fun + ({_Index, Key}) -> do_filter_by_partition({NS, Key}, PartitionsInfo); + (Key) -> do_filter_by_partition({NS, Key}, PartitionsInfo) + end, + Data + ); +filter_by_partition({_NS, _, _} = Name, {Data, _Continuation}) -> + filter_by_partition(Name, Data); +filter_by_partition(_Name, Data) -> + Data. + +-spec do_filter_by_partition( + mg_core_cluster_partitions:balancing_key(), + mg_core_cluster:partitions_info() +) -> boolean(). +do_filter_by_partition(BalancingKey, PartitionsInfo) -> + mg_core_cluster_partitions:is_local_partition(BalancingKey, PartitionsInfo). + %% %% logging %% diff --git a/apps/mg_core/src/mg_core_union.erl b/apps/mg_core/src/mg_core_union.erl deleted file mode 100644 index 08e8518..0000000 --- a/apps/mg_core/src/mg_core_union.erl +++ /dev/null @@ -1,247 +0,0 @@ --module(mg_core_union). - --behaviour(gen_server). - --export([start_link/1]). --export([ - init/1, - handle_continue/2, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --export([child_spec/1]). --export([discovery/1]). --export([cluster_size/0]). - --ifdef(TEST). --export([set_state/1]). --endif. - --define(SERVER, ?MODULE). --define(RECONNECT_TIMEOUT, 5000). - --type discovery_options() :: #{ - module := module(), - %% options is module specific structure - options := term() -}. --type dns_discovery_options() :: #{ - %% #{<<"domain_name">> => <<"machinegun-ha-headless">>,<<"sname">> => <<"machinegun">>} - binary() => binary() -}. --type cluster_options() :: #{ - discovery => discovery_options(), - reconnect_timeout => non_neg_integer() -}. --type state() :: #{ - known_nodes => [node()], - discovery => discovery_options(), - reconnect_timeout => non_neg_integer() -}. - -%% discovery behaviour callback --callback discovery(dns_discovery_options()) -> {ok, [node()]}. - --spec child_spec(cluster_options()) -> [supervisor:child_spec()]. -child_spec(#{discovery := _} = ClusterOpts) -> - [ - #{ - id => ?MODULE, - start => {?MODULE, start_link, [ClusterOpts]} - } - ]; -child_spec(_) -> - % cluster not configured, skip - []. - --spec discovery(dns_discovery_options()) -> {ok, [node()]}. -discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> - case get_addrs(unicode:characters_to_list(DomainName)) of - {ok, ListAddrs} -> - logger:info("union. resolve ~p with result: ~p", [DomainName, ListAddrs]), - {ok, addrs_to_nodes(lists:uniq(ListAddrs), Sname)}; - Error -> - error({resolve_error, Error}) - end. - --ifdef(TEST). --spec set_state(state()) -> ok. -set_state(NewState) -> - gen_server:call(?MODULE, {set_state, NewState}). --endif. - --spec cluster_size() -> non_neg_integer(). -cluster_size() -> - case whereis(?MODULE) of - undefined -> - %% for backward compatibility with consul - ReplicaCount = os:getenv("REPLICA_COUNT", "1"), - erlang:list_to_integer(ReplicaCount); - Pid when is_pid(Pid) -> - gen_server:call(Pid, get_cluster_size) - end. - -%%%=================================================================== -%%% Spawning and gen_server implementation -%%%=================================================================== --spec start_link(cluster_options()) -> {ok, pid()} | {error, term()}. -start_link(ClusterOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, ClusterOpts, []). - --spec init(cluster_options()) -> {ok, state(), {continue, {full_init, cluster_options()}}}. -init(ClusterOpts) -> - logger:info("union. init with options: ~p", [ClusterOpts]), - {ok, #{}, {continue, {full_init, ClusterOpts}}}. - --spec handle_continue({full_init, cluster_options()}, state()) -> {noreply, state()}. -handle_continue({full_init, #{discovery := #{module := Mod, options := Opts}} = ClusterOpts}, _State) -> - _ = net_kernel:monitor_nodes(true), - {ok, ListNodes} = Mod:discovery(Opts), - _ = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts)), - {noreply, ClusterOpts#{known_nodes => ListNodes}}. - --spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. --ifdef(TEST). -handle_call({set_state, NewState}, _From, _State) -> - {reply, ok, NewState}; -handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> - {reply, erlang:length(ListNodes), State}. --else. -handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> - {reply, erlang:length(ListNodes), State}. --endif. - --spec handle_cast(term(), state()) -> {noreply, state()}. -handle_cast(_Request, State) -> - {noreply, State}. - --spec handle_info(term(), state()) -> {noreply, state()}. -handle_info({timeout, _TRef, {reconnect, Node}}, State) -> - ListNodes = maybe_connect(Node, State), - {noreply, State#{known_nodes => ListNodes}}; -handle_info({nodeup, RemoteNode}, #{known_nodes := ListNodes} = State) -> - logger:info("union. ~p receive nodeup ~p", [node(), RemoteNode]), - NewState = - case lists:member(RemoteNode, ListNodes) of - true -> - %% well known node connected, do nothing - State; - false -> - %% new node connected, need update list nodes - #{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout} = State, - {ok, NewListNodes} = Mod:discovery(Opts), - _ = try_connect_all(NewListNodes, Timeout), - State#{known_nodes => NewListNodes} - end, - {noreply, NewState}; -handle_info({nodedown, RemoteNode}, #{reconnect_timeout := Timeout} = State) -> - logger:warning("union. ~p receive nodedown ~p", [node(), RemoteNode]), - _ = erlang:start_timer(Timeout, self(), {reconnect, RemoteNode}), - {noreply, State}. - --spec terminate(_Reason, state()) -> ok. -terminate(_Reason, _State) -> - ok. - --spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% cluster functions --spec connect(node(), non_neg_integer()) -> ok | error. -connect(Node, ReconnectTimeout) -> - case net_adm:ping(Node) of - pong -> - ok; - _ -> - _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), - error - end. - --spec try_connect_all([node()], non_neg_integer()) -> ok. -try_connect_all(ListNodes, ReconnectTimeout) -> - _ = lists:foreach(fun(Node) -> connect(Node, ReconnectTimeout) end, ListNodes). - --spec maybe_connect(node(), state()) -> [node()]. -maybe_connect(Node, #{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout}) -> - {ok, ListNodes} = Mod:discovery(Opts), - case lists:member(Node, ListNodes) of - false -> - %% node deleted from cluster, do nothing - skip; - true -> - connect(Node, Timeout) - end, - ListNodes. - -%% discovery functions --spec get_addrs(inet:hostname()) -> {ok, [inet:ip_address()]} | {error, _}. -get_addrs(DomainName) -> - case inet:getaddrs(DomainName, inet) of - {ok, _} = Ok -> Ok; - _ -> inet:getaddrs(DomainName, inet6) - end. - --spec addrs_to_nodes([inet:ip_address()], binary()) -> [node()]. -addrs_to_nodes(ListAddrs, Sname) -> - NodeName = unicode:characters_to_list(Sname), - lists:foldl( - fun(Addr, Acc) -> - [erlang:list_to_atom(NodeName ++ "@" ++ inet:ntoa(Addr)) | Acc] - end, - [], - ListAddrs - ). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - --define(CLUSTER_OPTS, #{ - discovery => #{ - module => mg_core_union, - options => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> - } - }, - reconnect_timeout => ?RECONNECT_TIMEOUT -}). - --spec test() -> _. - --spec connect_error_test() -> _. -connect_error_test() -> - ?assertEqual(error, connect('foo@127.0.0.1', 3000)). - --spec child_spec_test() -> _. -child_spec_test() -> - EmptyChildSpec = mg_core_union:child_spec(#{}), - ?assertEqual([], EmptyChildSpec), - ExpectedSpec = [ - #{ - id => mg_core_union, - start => { - mg_core_union, - start_link, - [?CLUSTER_OPTS] - } - } - ], - ChildSpec = mg_core_union:child_spec(?CLUSTER_OPTS), - ?assertEqual(ExpectedSpec, ChildSpec). - --spec for_full_cover_test() -> _. -for_full_cover_test() -> - ?assertEqual({noreply, #{}}, handle_cast([], #{})), - ?assertEqual(ok, terminate(term, #{})), - ?assertEqual({ok, #{}}, code_change(old, #{}, extra)). - --endif. diff --git a/apps/mg_core/test/mg_core_cluster_SUITE.erl b/apps/mg_core/test/mg_core_cluster_SUITE.erl new file mode 100644 index 0000000..22a87a5 --- /dev/null +++ b/apps/mg_core/test/mg_core_cluster_SUITE.erl @@ -0,0 +1,190 @@ +-module(mg_core_cluster_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% API +-export([ + init_per_suite/1, + end_per_suite/1, + all/0, + groups/0 +]). + +-define(RECONNECT_TIMEOUT, 2000). +-define(CLUSTER_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"localhost">>, + <<"sname">> => <<"test_node">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + reconnect_timeout => ?RECONNECT_TIMEOUT +}). +-define(NEIGHBOUR, 'peer@127.0.0.1'). + +-export([peer_test/1]). +-export([base_test/1]). +-export([reconnect_rebalance_test/1]). +-export([connecting_rebalance_test/1]). +-export([double_connecting_test/1]). +-export([deleted_node_down_test/1]). + +-type config() :: [{atom(), term()}]. +-type test_case_name() :: atom(). +-type group_name() :: atom(). +-type test_result() :: any() | no_return(). + +-define(PARTITIONS_INFO_WITH_PEER, #{ + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + balancing_table => #{ + {0, 818} => 0, + {819, 1637} => 1, + {1638, 2046} => 0, + {2047, 2456} => 1, + {2457, 2865} => 0, + {2866, 3275} => 1, + {3276, 3684} => 0, + {3685, 4095} => 1 + }, + local_table => #{ + 0 => 'test_node@127.0.0.1' + }, + partitions_table => #{ + 0 => 'test_node@127.0.0.1', + 1 => 'peer@127.0.0.1' + } +}). +-define(PARTITIONS_INFO_WO_PEER, #{ + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + balancing_table => #{ + {0, 818} => 0, + {819, 1637} => 0, + {1638, 2456} => 0, + {2457, 3275} => 0, + {3276, 4095} => 0 + }, + local_table => #{ + 0 => 'test_node@127.0.0.1' + }, + partitions_table => #{ + 0 => 'test_node@127.0.0.1' + } +}). +%% erlang:phash2({<<"Namespace">>, <<"ID">>}, 4095) = 1075 +-define(KEY, {<<"Namespace">>, <<"ID">>}). + +-spec init_per_suite(_) -> _. +init_per_suite(Config) -> + WorkDir = os:getenv("WORK_DIR"), + EbinDir = WorkDir ++ "/_build/test/lib/mg_cth/ebin", + ok = start_peer(), + true = erpc:call(?NEIGHBOUR, code, add_path, [EbinDir]), + {ok, _Pid} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, start, []), + Config. + +-spec end_per_suite(_) -> _. +end_per_suite(_Config) -> + ok. + +-spec test() -> _. + +-spec all() -> [{group, test_case_name()}]. +all() -> + [{group, basic_operations}]. + +-spec groups() -> [{group_name(), list(), [test_case_name()]}]. +groups() -> + [ + {basic_operations, [], [ + peer_test, + base_test, + reconnect_rebalance_test, + connecting_rebalance_test, + double_connecting_test, + deleted_node_down_test + ]} + ]. + +-spec peer_test(config()) -> test_result(). +peer_test(_Config) -> + {ok, 'ECHO'} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, echo, ['ECHO']), + {ok, #{1 := 'peer@127.0.0.1'}} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, connecting, [{#{}, node()}]). + +-spec base_test(config()) -> test_result(). +base_test(_Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual(2, mg_core_cluster:cluster_size()), + ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), + ?assertEqual(false, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WITH_PEER)), + ?assertEqual(true, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER)), + exit(Pid, normal). + +-spec reconnect_rebalance_test(config()) -> test_result(). +reconnect_rebalance_test(_Config) -> + %% node_down - rebalance - reconnect by timer - rebalance + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + Pid ! {nodedown, ?NEIGHBOUR}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), + ?assertEqual(true, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER)), + + %% wait reconnecting + timer:sleep(?RECONNECT_TIMEOUT + 10), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), + ?assertEqual(false, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WITH_PEER)), + exit(Pid, normal). + +-spec connecting_rebalance_test(config()) -> test_result(). +connecting_rebalance_test(_Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + Pid ! {nodedown, ?NEIGHBOUR}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), + + %% force connecting + ?assertEqual({ok, #{0 => node()}}, mg_core_cluster:connecting({#{1 => ?NEIGHBOUR}, ?NEIGHBOUR})), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), + exit(Pid, normal). + +-spec double_connecting_test(config()) -> test_result(). +double_connecting_test(_Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + %% double connect + ?assertEqual({ok, #{0 => node()}}, mg_core_cluster:connecting({#{1 => ?NEIGHBOUR}, ?NEIGHBOUR})), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + exit(Pid, normal). + +-spec deleted_node_down_test(config()) -> test_result(). +deleted_node_down_test(_Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + Pid ! {nodedown, 'foo@127.0.0.1'}, + %% wait reconnect timeout + ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + exit(Pid, normal). + +%% Internal functions + +-spec start_peer() -> _. +start_peer() -> + {ok, _Pid, _Node} = peer:start(#{ + name => peer, + longnames => true, + host => "127.0.0.1" + }), + pong = net_adm:ping(?NEIGHBOUR), + ok. diff --git a/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl b/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl index 84311fa..c5ae638 100644 --- a/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl +++ b/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl @@ -141,7 +141,8 @@ automaton_options() -> }, retries => #{ continuation => {intervals, ?TEST_INTERVALS} - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_events_machine_SUITE.erl b/apps/mg_core/test/mg_core_events_machine_SUITE.erl index 1e43fa6..5fa4ca1 100644 --- a/apps/mg_core/test/mg_core_events_machine_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_machine_SUITE.erl @@ -436,6 +436,7 @@ events_machine_options(Base, StorageOptions, ProcessorOptions, NS) -> namespace => NS, processor => {?MODULE, ProcessorOptions}, machines => #{ + scaling => partition_based, namespace => NS, storage => mg_cth:build_storage(NS, Storage), worker => #{ diff --git a/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl b/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl index d241b72..bfa180e 100644 --- a/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl @@ -202,7 +202,8 @@ events_machine_options(ProcessorOptions, NS) -> pulse => ?MODULE, storage => mg_core_storage_memory }, - pulse => Pulse + pulse => Pulse, + scaling => global_based }, events_storage => mg_cth:build_storage(<>, Storage) }. diff --git a/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl b/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl index adc46d0..d3ca349 100644 --- a/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl @@ -153,7 +153,8 @@ event_sink_ns_options() -> }, pulse => ?MODULE, default_processing_timeout => 1000, - events_storage => mg_core_storage_memory + events_storage => mg_core_storage_memory, + scaling => global_based }. -spec event_sink_options() -> mg_core_events_sink_machine:options(). diff --git a/apps/mg_core/test/mg_core_events_stash_SUITE.erl b/apps/mg_core/test/mg_core_events_stash_SUITE.erl index bd685e2..696d899 100644 --- a/apps/mg_core/test/mg_core_events_stash_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_stash_SUITE.erl @@ -254,7 +254,8 @@ events_machine_options(Options) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }, events_storage => {mg_core_storage_memory, #{ diff --git a/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl b/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl index 8f46a68..6f566dd 100644 --- a/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl +++ b/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl @@ -216,7 +216,8 @@ automaton_options(NS) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec automaton_options_wo_shedulers(mg_core:ns()) -> mg_core_machine:options(). @@ -236,7 +237,8 @@ automaton_options_wo_shedulers(NS) -> pulse => ?MODULE, schedulers => #{ % none - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl b/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl index d91407c..305e859 100644 --- a/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl +++ b/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl @@ -142,7 +142,8 @@ automaton_options(NS) -> schedulers => #{ timers => #{min_scan_delay => 1000}, timers_retries => #{min_scan_delay => 1000} - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> no_return(). diff --git a/apps/mg_core/test/mg_core_interrupted_SUITE.erl b/apps/mg_core/test/mg_core_interrupted_SUITE.erl index 4ebbcbb..7347f25 100644 --- a/apps/mg_core/test/mg_core_interrupted_SUITE.erl +++ b/apps/mg_core/test/mg_core_interrupted_SUITE.erl @@ -179,7 +179,8 @@ automaton_options(NS, StorageName) -> pulse => ?MODULE, schedulers => #{ overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_machine_SUITE.erl b/apps/mg_core/test/mg_core_machine_SUITE.erl index 7178bbf..542f0e3 100644 --- a/apps/mg_core/test/mg_core_machine_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_SUITE.erl @@ -250,7 +250,8 @@ automaton_options(C) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl b/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl index 1edfe0c..92ef0b0 100644 --- a/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl @@ -117,7 +117,7 @@ check_chain(Options, ID, ReportPid) -> -spec check_chain(mg_core_machine:options(), id(), seq(), [action()], state(), pid()) -> ok. % TODO убрать константы -check_chain(_, ID, 100000, _, _, ReportPid) -> +check_chain(_, ID, 10000, _, _, ReportPid) -> ReportPid ! ?CHAIN_COMPLETE(ID), ok; check_chain(Options, ID, Seq, AllActions, State, ReportPid) -> @@ -287,7 +287,8 @@ automaton_options() -> pulse => ?MODULE, storage => mg_core_storage_memory }, - pulse => ?MODULE + pulse => ?MODULE, + scaling => global_based }. -spec lists_random(list(T)) -> T. diff --git a/apps/mg_core/test/mg_core_machine_notification_SUITE.erl b/apps/mg_core/test/mg_core_machine_notification_SUITE.erl index 1a2496b..0e7b8bc 100644 --- a/apps/mg_core/test/mg_core_machine_notification_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_notification_SUITE.erl @@ -309,7 +309,8 @@ automaton_options(_C) -> scan_handicap => 1, reschedule_time => 2 } - } + }, + scaling => global_based }. -spec notification_options() -> mg_core_notification:options(). @@ -317,7 +318,8 @@ notification_options() -> #{ namespace => ?NS, pulse => ?MODULE, - storage => mg_core_storage_memory + storage => mg_core_storage_memory, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_timer_retry_SUITE.erl b/apps/mg_core/test/mg_core_timer_retry_SUITE.erl index 82ffb63..58a0942 100644 --- a/apps/mg_core/test/mg_core_timer_retry_SUITE.erl +++ b/apps/mg_core/test/mg_core_timer_retry_SUITE.erl @@ -216,7 +216,8 @@ automaton_options(NS, RetryPolicy) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_union_SUITE.erl b/apps/mg_core/test/mg_core_union_SUITE.erl deleted file mode 100644 index 3c8e285..0000000 --- a/apps/mg_core/test/mg_core_union_SUITE.erl +++ /dev/null @@ -1,142 +0,0 @@ --module(mg_core_union_SUITE). - --include_lib("eunit/include/eunit.hrl"). - -%% API --export([ - init_per_suite/1, - end_per_suite/1, - all/0, - groups/0 -]). - --define(RECONNECT_TIMEOUT, 1000). --define(CLUSTER_OPTS, #{ - discovery => #{ - module => mg_core_union, - options => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> - } - }, - reconnect_timeout => ?RECONNECT_TIMEOUT -}). - --export([nxdomain_test/1]). --export([start_ok_test/1]). --export([unknown_nodedown_test/1]). --export([exists_nodedown_test/1]). --export([unknown_nodeup_test/1]). --export([exists_nodeup_test/1]). --export([cluster_size_test/1]). - --type config() :: [{atom(), term()}]. --type test_case_name() :: atom(). --type group_name() :: atom(). --type test_result() :: any() | no_return(). - --spec init_per_suite(_) -> _. -init_per_suite(Config) -> - Config. - --spec end_per_suite(_) -> _. -end_per_suite(_Config) -> - ok. - --spec test() -> _. - --spec all() -> [{group, test_case_name()}]. -all() -> - [{group, basic_operations}]. - --spec groups() -> [{group_name(), list(), [test_case_name()]}]. -groups() -> - [ - {discovery, [], [ - nxdomain_test - ]}, - {basic_operations, [], [ - start_ok_test, - unknown_nodedown_test, - exists_nodedown_test, - unknown_nodeup_test, - exists_nodeup_test, - cluster_size_test - ]} - ]. - --spec nxdomain_test(config()) -> test_result(). -nxdomain_test(_Config) -> - ?assertError( - {resolve_error, {error, nxdomain}}, - mg_core_union:discovery(#{<<"domain_name">> => <<"bad_name">>, <<"sname">> => <<"mg">>}) - ). - --spec start_ok_test(config()) -> test_result(). -start_ok_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - State = await_sys_get_state(Pid), - #{known_nodes := ListNodes} = State, - lists:foreach( - fun(Node) -> - ?assertEqual(pong, net_adm:ping(Node)) - end, - ListNodes - ), - exit(Pid, normal). - --spec unknown_nodedown_test(config()) -> test_result(). -unknown_nodedown_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - nodedown_check(Pid, 'foo@127.0.0.1'), - exit(Pid, normal). - --spec exists_nodedown_test(config()) -> test_result(). -exists_nodedown_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - nodedown_check(Pid, node()), - exit(Pid, normal). - --spec unknown_nodeup_test(config()) -> test_result(). -unknown_nodeup_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - State = await_sys_get_state(Pid), - mg_core_union:set_state(State#{known_nodes => []}), - Pid ! {nodeup, node()}, - #{known_nodes := List} = await_sys_get_state(Pid), - ?assertEqual(List, [node()]), - exit(Pid, normal). - --spec exists_nodeup_test(config()) -> test_result(). -exists_nodeup_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - #{known_nodes := List1} = await_sys_get_state(Pid), - ?assertEqual(List1, [node()]), - Pid ! {nodeup, node()}, - #{known_nodes := List2} = await_sys_get_state(Pid), - ?assertEqual(List2, [node()]), - exit(Pid, normal). - --spec cluster_size_test(config()) -> test_result(). -cluster_size_test(_Config) -> - _ = os:putenv("REPLICA_COUNT", "3"), - ?assertEqual(3, mg_core_union:cluster_size()), - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - ?assertEqual(1, mg_core_union:cluster_size()), - exit(Pid, normal). - -%% Internal functions --spec nodedown_check(pid(), node()) -> _. -nodedown_check(Pid, Node) -> - #{known_nodes := ListNodes1} = await_sys_get_state(Pid), - Pid ! {nodedown, Node}, - timer:sleep(?RECONNECT_TIMEOUT + 10), - #{known_nodes := ListNodes2} = await_sys_get_state(Pid), - ?assertEqual(ListNodes1, ListNodes2). - --spec await_sys_get_state(pid()) -> any(). -await_sys_get_state(Pid) -> - case sys:get_state(Pid, 100) of - {error, _} -> await_sys_get_state(Pid); - State -> State - end. diff --git a/apps/mg_cth/src/mg_cth_configurator.erl b/apps/mg_cth/src/mg_cth_configurator.erl index 397ba73..356f362 100644 --- a/apps/mg_cth/src/mg_cth_configurator.erl +++ b/apps/mg_cth/src/mg_cth_configurator.erl @@ -18,13 +18,15 @@ schedulers := mg_core_machine:schedulers_opt(), default_processing_timeout := timeout(), suicide_probability => mg_core_machine:suicide_probability(), - event_stash_size := non_neg_integer() + event_stash_size := non_neg_integer(), + scaling => mg_core_cluster:scaling_type() }. -type event_sink_ns() :: #{ default_processing_timeout := timeout(), storage => mg_core_storage:options(), - worker => mg_core_worker:options() + worker => mg_core_worker:options(), + scaling => mg_core_cluster:scaling_type() }. -type config() :: #{ @@ -47,6 +49,8 @@ construct_child_specs( } = Config ) -> Quotas = maps:get(quotas, Config, []), + ClusterOpts = maps:get(cluster, Config, #{}), + Scaling = maps:get(scaling, ClusterOpts, global_based), QuotasChSpec = quotas_child_specs(Quotas, quota), EventSinkChSpec = event_sink_ns_child_spec(EventSinkNS, event_sink), @@ -55,7 +59,7 @@ construct_child_specs( woody_server, #{ woody_server => WoodyServer, - automaton => api_automaton_options(Namespaces, EventSinkNS), + automaton => api_automaton_options(Namespaces, EventSinkNS, #{scaling => Scaling}), event_sink => api_event_sink_options(Namespaces, EventSinkNS), pulse => mg_cth_pulse } @@ -113,7 +117,8 @@ machine_options(NS, Config) -> Options = maps:with( [ retries, - timer_processing_timeout + timer_processing_timeout, + scaling ], Config ), @@ -134,8 +139,8 @@ machine_options(NS, Config) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(_, event_sink_ns()) -> mg_woody_automaton:options(). -api_automaton_options(NSs, EventSinkNS) -> +-spec api_automaton_options(_, event_sink_ns(), _Opts) -> mg_woody_automaton:options(). +api_automaton_options(NSs, EventSinkNS, Opts) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ @@ -147,7 +152,7 @@ api_automaton_options(NSs, EventSinkNS) -> ) } end, - #{}, + Opts, NSs ). diff --git a/apps/mg_cth/src/mg_cth_neighbour.erl b/apps/mg_cth/src/mg_cth_neighbour.erl new file mode 100644 index 0000000..95bdf63 --- /dev/null +++ b/apps/mg_cth/src/mg_cth_neighbour.erl @@ -0,0 +1,67 @@ +-module(mg_cth_neighbour). + +-behaviour(gen_server). + +-export([start/0]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([connecting/1]). +-export([echo/1]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +-type state() :: #state{}. + +-spec connecting(_) -> _. +connecting(RemoteData) -> + gen_server:call(?MODULE, {connecting, RemoteData}). + +-spec echo(_) -> _. +echo(Msg) -> + gen_server:call(?MODULE, {echo, Msg}). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== +-spec start() -> _. +start() -> + gen_server:start({local, ?SERVER}, ?MODULE, [], []). + +-spec init(_) -> {ok, state()}. +init([]) -> + {ok, #state{}}. + +-spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. +handle_call({echo, Echo}, _From, State = #state{}) -> + {reply, {ok, Echo}, State}; +handle_call({connecting, _RemoteData}, _From, State = #state{}) -> + {reply, {ok, #{1 => node()}}, State}. + +-spec handle_cast(term(), state()) -> {noreply, state()}. +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +-spec terminate(_Reason, state()) -> ok. +terminate(_Reason, _State = #state{}) -> + ok. + +-spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 65564e8..b3cf685 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -29,7 +29,10 @@ -import(mg_woody_packer, [pack/2, unpack/2]). %% API types --type options() :: #{mg_core:ns() => ns_options()}. +-type options() :: #{ + mg_core:ns() => ns_options(), + scaling => mg_core_cluster:scaling_type() +}. -type ns_options() :: #{ machine := mg_core_events_machine:options(), modernizer => mg_core_events_modernizer:options() @@ -51,6 +54,8 @@ %% -spec handler(options()) -> mg_woody_utils:woody_handler(). +handler(#{scaling := partition_based} = Options) -> + {"/v1/automaton", {{mg_proto_state_processing_thrift, 'Automaton'}, {mg_woody_automaton_balancer, Options}}}; handler(Options) -> {"/v1/automaton", {{mg_proto_state_processing_thrift, 'Automaton'}, {?MODULE, Options}}}. @@ -106,6 +111,7 @@ handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> {ok, Reply} -> {ok, pack(repair_response, Reply)}; {error, {failed, Reason}} -> + %% TODO catch this in router!!! woody_error:raise(business, pack(repair_error, Reason)) end; handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> diff --git a/apps/mg_woody/src/mg_woody_automaton_balancer.erl b/apps/mg_woody/src/mg_woody_automaton_balancer.erl new file mode 100644 index 0000000..1f78745 --- /dev/null +++ b/apps/mg_woody/src/mg_woody_automaton_balancer.erl @@ -0,0 +1,74 @@ +-module(mg_woody_automaton_balancer). + +%% woody handler +-behaviour(woody_server_thrift_handler). +-export([handle_function/4]). + +%% уменьшаем писанину +-import(mg_woody_packer, [unpack/2]). + +%% API types +-type options() :: mg_woody_automaton:options(). + +-define(AUTOMATON_HANDLER, mg_woody_automaton). + +%% +%% woody handler +%% +-spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> + {ok, _Result} | no_return(). + +handle_function('Start' = Call, {NS, IDIn, _Args} = Data, WoodyContext, Options) -> + ID = unpack(id, IDIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function(Call, {MachineDesc, _Args} = Data, WoodyContext, Options) when + Call =:= 'Repair'; + Call =:= 'Call'; + Call =:= 'Notify' +-> + {NS, ID, _Range} = unpack(machine_descriptor, MachineDesc), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function('SimpleRepair' = Call, {NS, RefIn} = Data, WoodyContext, Options) -> + ID = unpack(ref, RefIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function(Call, {MachineDesc} = Data, WoodyContext, Options) when + Call =:= 'GetMachine'; + Call =:= 'Modernize' +-> + {NS, ID, _Range} = unpack(machine_descriptor, MachineDesc), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function('Remove' = Call, {NS, IDIn} = Data, WoodyContext, Options) -> + ID = unpack(id, IDIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ). + +%% internal functions + +-spec target_node(term()) -> node(). +target_node(BalancingKey) -> + {ok, Node} = mg_core_cluster:get_node(BalancingKey), + Node. diff --git a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl index 58fee70..aaffed3 100644 --- a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl @@ -176,6 +176,7 @@ mg_woody_config(Name, C) -> timers => #{} }, retries => #{}, + scaling => global_based, event_stash_size => 0 }, case Name of @@ -193,6 +194,7 @@ mg_woody_config(Name, C) -> }, event_sink_ns => #{ storage => mg_core_storage_memory, + scaling => global_based, default_processing_timeout => 5000 } }. diff --git a/apps/mg_woody/test/mg_stress_SUITE.erl b/apps/mg_woody/test/mg_stress_SUITE.erl index d674a8b..bdf7b10 100644 --- a/apps/mg_woody/test/mg_stress_SUITE.erl +++ b/apps/mg_woody/test/mg_stress_SUITE.erl @@ -113,11 +113,13 @@ mg_woody_config(_C) -> event_sinks => [ {mg_core_events_sink_machine, #{name => default, machine_id => ?ES_ID}} ], + scaling => global_based, event_stash_size => 10 } }, event_sink_ns => #{ storage => mg_core_storage_memory, + scaling => global_based, default_processing_timeout => 5000 } }. diff --git a/apps/mg_woody/test/mg_woody_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_tests_SUITE.erl index 89fb97c..8226b9c 100644 --- a/apps/mg_woody/test/mg_woody_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_woody_tests_SUITE.erl @@ -329,6 +329,7 @@ mg_woody_config(C) -> storage => {exponential, infinity, 1, 10}, timers => {exponential, infinity, 1, 10} }, + scaling => global_based, % сейчас существуют проблемы, которые не дают включить на постоянной основе эту % опцию (а очень хочется, чтобы проверять работоспособность идемпотентных ретраев) % TODO в будущем нужно это сделать @@ -349,6 +350,7 @@ mg_woody_config(C) -> }, event_sink_ns => #{ storage => mg_core_storage_memory, + scaling => global_based, default_processing_timeout => 5000 } }. @@ -712,6 +714,7 @@ config_with_multiple_event_sinks(_C) -> overseer => #{} }, retries => #{}, + scaling => global_based, event_sinks => [ {mg_core_events_sink_machine, #{name => default, machine_id => <<"SingleES">>}} ] @@ -728,6 +731,7 @@ config_with_multiple_event_sinks(_C) -> overseer => #{} }, retries => #{}, + scaling => global_based, event_sinks => [ {mg_core_events_sink_machine, #{ name => machine, @@ -743,6 +747,7 @@ config_with_multiple_event_sinks(_C) -> }, event_sink_ns => #{ storage => mg_core_storage_memory, + scaling => global_based, default_processing_timeout => 5000 } }, diff --git a/compose.yaml b/compose.yaml index 2816ce6..be43199 100644 --- a/compose.yaml +++ b/compose.yaml @@ -2,6 +2,8 @@ services: testrunner: image: $DEV_IMAGE_TAG + environment: + WORK_DIR: $PWD build: dockerfile: Dockerfile.dev context: . @@ -10,7 +12,7 @@ services: THRIFT_VERSION: $THRIFT_VERSION volumes: - .:$PWD - hostname: $SERVICE_NAME + hostname: ${SERVICE_NAME}-0 cap_add: - NET_ADMIN working_dir: $PWD diff --git a/config/config.yaml b/config/config.yaml index ee75a68..068675e 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -162,6 +162,12 @@ cluster: domain_name: machinegun-headless # name that will be used for construct full nodename (for example name@127.0.0.1) sname: machinegun + # optional, supported values: global_based | partition_based, default global_based + scaling: partition_based + # required when scaling = partition_based + partitioning: + capacity: 31 + max_hash: 4095 # optional, default value 5000 ms reconnect_timeout: 5000 diff --git a/elvis.config b/elvis.config index ae0fa37..895e2d7 100644 --- a/elvis.config +++ b/elvis.config @@ -42,7 +42,7 @@ mg_core_gen_squad, mg_core_gen_squad_heart, mg_core_storage_memory, - mg_core_union, + mg_core_cluster, mg_core_worker ] }}, diff --git a/rebar.config b/rebar.config index 398da9c..b78969b 100644 --- a/rebar.config +++ b/rebar.config @@ -90,7 +90,7 @@ ]} ]}, {test, [ - {deps, [{proper, "1.4.0"}]}, + {deps, [{proper, "1.4.0"}, {meck, "0.9.2"}]}, {cover_enabled, true}, {cover_excl_apps, [mg_cth]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, proper]}]} diff --git a/rebar.lock b/rebar.lock index 4c937cb..cb2e579 100644 --- a/rebar.lock +++ b/rebar.lock @@ -64,6 +64,10 @@ {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.11">>},1}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},1}, + {<<"ranger">>, + {git,"https://github.com/valitydev/ranger", + {ref,"351d47acf133ea04df4fe9069c76609d22b4c5d8"}}, + 0}, {<<"recon">>,{pkg,<<"recon">>,<<"2.3.2">>},0}, {<<"riak_pb">>, {git,"https://github.com/basho/riak_pb", diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index cdbc69a..c207d6b 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -448,9 +448,14 @@ opentelemetry_conf(YamlConfig) -> ?C:conf([opentelemetry], YamlConfig, undefined). health_check_fun(YamlConfig) -> - case ?C:conf([process_registry, module], YamlConfig, <<"mg_core_procreg_consuela">>) of - <<"mg_core_procreg_consuela">> -> consuela; - <<"mg_core_procreg_global">> -> global + case cluster(YamlConfig) of + #{discovering := _} -> + global; + _ -> + case ?C:conf([consuela], YamlConfig, undefined) of + undefined -> skip; + _ -> consuela + end end. cluster(YamlConfig) -> @@ -462,13 +467,13 @@ cluster(YamlConfig) -> <<"dns">> -> DiscoveryOptsList = ?C:conf([cluster, discovery, options], YamlConfig), ReconnectTimeout = ?C:conf([cluster, reconnect_timeout], YamlConfig, 5000), - #{ - discovery => #{ - module => mg_core_union, - options => maps:from_list(DiscoveryOptsList) - }, + PartitionsOpts = partitions_options(YamlConfig), + genlib_map:compact(#{ + discovering => maps:from_list(DiscoveryOptsList), + scaling => scaling(YamlConfig), + partitioning => PartitionsOpts, reconnect_timeout => ReconnectTimeout - }; + }); _ -> #{} end; @@ -476,6 +481,17 @@ cluster(YamlConfig) -> #{} end. +partitions_options(YamlConfig) -> + case ?C:conf([cluster, partitioning], YamlConfig, undefined) of + undefined -> + undefined; + ListOpts -> + lists:foldl(fun({Key, Value}, Acc) -> Acc#{erlang:binary_to_atom(Key) => Value} end, #{}, ListOpts) + end. + +scaling(YamlConfig) -> + ?C:atom(?C:conf([cluster, scaling], YamlConfig, <<"global_based">>)). + quotas(YamlConfig) -> SchedulerLimit = ?C:conf([limits, scheduler_tasks], YamlConfig, 5000), [ @@ -633,7 +649,8 @@ namespace({Name, NSYamlConfig}, YamlConfig) -> schedulers => namespace_schedulers(NSYamlConfig), event_sinks => [event_sink(ES) || ES <- ?C:conf([event_sinks], NSYamlConfig, [])], suicide_probability => ?C:probability(?C:conf([suicide_probability], NSYamlConfig, 0)), - event_stash_size => ?C:conf([event_stash_size], NSYamlConfig, 0) + event_stash_size => ?C:conf([event_stash_size], NSYamlConfig, 0), + scaling => scaling(YamlConfig) }, conf_with([modernizer], NSYamlConfig, #{}, fun(ModernizerYamlConfig) -> #{ @@ -747,6 +764,7 @@ event_sink(kafka, Name, ESYamlConfig) -> topic => ?C:conf([topic], ESYamlConfig) }}. +%% TODO procreg(YamlConfig) -> % Use process_registry if it's set up or consuela if it's set up, gproc otherwise Default = conf_with(