From 49a4c185d9705e17fd038aba754e6c6f9bc56d5b Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 26 Mar 2024 22:57:48 +0300 Subject: [PATCH 1/4] TD-877: add mg nodes to compose --- Dockerfile.dev | 12 +- Dockerfile.test | 61 +++ Makefile | 20 +- apps/mg_core/src/mg_core_cluster.erl | 27 +- .../src/mg_core_cluster_partitions.erl | 24 +- apps/mg_core/src/mg_core_storage.erl | 4 +- apps/mg_core/test/mg_core_cluster_SUITE.erl | 263 ++++++++--- compose.yaml | 35 ++ config/config.example.yaml | 398 ++++++++++++++++ config/config.yaml | 436 ++++-------------- rebar.config | 3 +- rebar.config.script | 23 + rel_scripts/configurator.escript | 1 + test_resources/authorized_keys | 1 + test_resources/ssh/config | 1 + test_resources/ssh/id_rsa | 38 ++ test_resources/ssh/id_rsa.pub | 1 + 17 files changed, 904 insertions(+), 444 deletions(-) create mode 100644 Dockerfile.test create mode 100644 config/config.example.yaml create mode 100644 rebar.config.script create mode 100644 test_resources/authorized_keys create mode 100644 test_resources/ssh/config create mode 100644 test_resources/ssh/id_rsa create mode 100644 test_resources/ssh/id_rsa.pub diff --git a/Dockerfile.dev b/Dockerfile.dev index 35d7bc0..3f4061d 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -7,7 +7,7 @@ ARG THRIFT_VERSION ARG BUILDARCH RUN apt-get --yes update && \ - apt-get --yes --no-install-recommends install iproute2=5.10.0-4 && \ + apt-get --yes --no-install-recommends install iproute2=5.10.0-4 sshpass && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* @@ -17,7 +17,13 @@ RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_ ENV NETUNREACH_NETWORK="10.254.254.0/24" ENV NETUNREACH_ADDRESS="10.254.254.10" +COPY ./test_resources/ssh/ /root/.ssh/ +RUN chown -R root:root /root/.ssh +RUN chmod 600 /root/.ssh/* + RUN echo '#!/bin/sh' >> /entrypoint.sh && \ + echo 'ADDR=`ip route get 8.8.8.8 | grep -oP "src \K[^ ]+"`' >> /entrypoint.sh && \ + echo 'export IP=${ADDR}' >> /entrypoint.sh && \ echo "ip route add throw ${NETUNREACH_NETWORK}" >> /entrypoint.sh && \ echo 'exec "$@"' >> /entrypoint.sh && \ chmod +x /entrypoint.sh @@ -26,4 +32,8 @@ ENTRYPOINT ["/entrypoint.sh"] ENV CHARSET=UTF-8 ENV LANG=C.UTF-8 +ENV SERVICE_NAME=mg CMD ["/bin/bash"] + +EXPOSE 4369 +EXPOSE 8022 diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 0000000..25c634f --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,61 @@ +ARG OTP_VERSION + +# Build the release +FROM docker.io/library/erlang:${OTP_VERSION} AS builder +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +# Install thrift compiler +ARG THRIFT_VERSION +ARG TARGETARCH + +RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${BUILDARCH}.tar.gz" \ + | tar -xvz -C /usr/local/bin/ + +# Copy sources +RUN mkdir /build +COPY . /build/ + +# Build the release +WORKDIR /build +RUN rebar3 compile \ + && rebar3 as prod release + +# Make a runner image +FROM docker.io/library/erlang:${OTP_VERSION}-slim + +ARG SERVICE_NAME=mg +ARG USER_UID=1001 +ARG USER_GID=$USER_UID + +# Set env +ENV CHARSET=UTF-8 +ENV LANG=C.UTF-8 + +# Expose SERVICE_NAME as env so CMD expands properly on start +ENV SERVICE_NAME=${SERVICE_NAME} + +# Set runtime +WORKDIR /opt/${SERVICE_NAME} + +COPY --from=builder /build/_build/prod/rel/${SERVICE_NAME} /opt/${SERVICE_NAME} +COPY ./test_resources/authorized_keys /root/.ssh/authorized_keys +RUN chown root:root /root/.ssh/authorized_keys +RUN chmod 600 /root/.ssh/* + +# SSH install +RUN apt-get update && apt-get install -y openssh-server iproute2 iputils-ping dnsutils mc +RUN mkdir /var/run/sshd +RUN echo 'root:security' | chpasswd +RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config + +RUN echo '#!/bin/sh' >> /entrypoint.sh && \ + echo 'ADDR=`ip route get 8.8.8.8 | grep -oP "src \K[^ ]+"`' >> /entrypoint.sh && \ + echo 'echo "dist_node_name: mg@${ADDR}" >> /opt/${SERVICE_NAME}/etc/config.yaml' >> /entrypoint.sh && \ + echo '/usr/sbin/sshd -D' >> /entrypoint.sh && \ + chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] + +EXPOSE 22 +EXPOSE 4369 +EXPOSE 8022 diff --git a/Makefile b/Makefile index 622758d..a629530 100644 --- a/Makefile +++ b/Makefile @@ -14,15 +14,19 @@ DOTENV := $(shell grep -v '^\#' .env) DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) +TEST_IMAGE_TAG = $(DIST_CONTAINER_NAME)-test +TEST_IMAGE_ID = $(file < .image.test) + DOCKER ?= docker DOCKERCOMPOSE ?= docker-compose DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f compose.yaml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner +DIST_CONTAINER_NAME ?= distrunner all: compile xref lint check-format dialyze eunit -.PHONY: dev-image clean-dev-image wc-shell test +.PHONY: dev-image test-image clean-dev-image clean-test-image wc-shell test dev-image: .image.dev @@ -36,6 +40,18 @@ ifneq ($(DEV_IMAGE_ID),) rm .image.dev endif +test-image: .image.test + +.image.test: Dockerfile.test .env + env $(DOTENV) $(DOCKERCOMPOSE_W_ENV) build $(DIST_CONTAINER_NAME) + $(DOCKER) image ls -q -f "reference=$(TEST_IMAGE_ID)" | head -n1 > $@ + +clean-test-image: +ifneq ($(TEST_IMAGE_ID),) + $(DOCKER) image rm -f $(TEST_IMAGE_TAG) + rm .image.test +endif + DOCKER_WC_OPTIONS := -v $(PWD):$(PWD) --workdir $(PWD) DOCKER_WC_EXTRA_OPTIONS ?= --rm DOCKER_RUN = $(DOCKER) run -t $(DOCKER_WC_OPTIONS) $(DOCKER_WC_EXTRA_OPTIONS) @@ -87,7 +103,7 @@ eunit: $(REBAR) eunit --cover common-test: - $(REBAR) ct --cover --name test_node@127.0.0.1 + $(REBAR) ct --cover cover: $(REBAR) covertool generate diff --git a/apps/mg_core/src/mg_core_cluster.erl b/apps/mg_core/src/mg_core_cluster.erl index 9a2f505..bf6c2b0 100644 --- a/apps/mg_core/src/mg_core_cluster.erl +++ b/apps/mg_core/src/mg_core_cluster.erl @@ -22,12 +22,6 @@ -define(SERVER, ?MODULE). -define(RECONNECT_TIMEOUT, 5000). --ifdef(TEST). --define(NEIGHBOUR, mg_cth_neighbour). --else. --define(NEIGHBOUR, ?MODULE). --endif. - -type discovery_options() :: mg_core_cluster_partitions:discovery_options(). -type scaling_type() :: global_based | partition_based. @@ -127,6 +121,7 @@ handle_continue( LocalTable = mg_core_cluster_partitions:make_local_table(ScalingType), PartitionsTable = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts), LocalTable), BalancingTable = mg_core_cluster_partitions:make_balancing_table( + ScalingType, PartitionsTable, maps:get(partitioning, ClusterOpts, undefined) ), @@ -159,7 +154,11 @@ handle_call( } = State ) -> NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), - NewBalancingTable = mg_core_cluster_partitions:make_balancing_table(NewPartitionsTable, PartitionsOpts), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table( + partition_based, + NewPartitionsTable, + PartitionsOpts + ), { reply, {ok, LocalTable}, @@ -219,7 +218,13 @@ code_change(_OldVsn, State, _Extra) -> connect(Node, ReconnectTimeout, LocalTable) when Node =/= node() -> case net_adm:ping(Node) of pong -> - erpc:call(Node, ?NEIGHBOUR, connecting, [{LocalTable, node()}]); + try + erpc:call(Node, ?MODULE, connecting, [{LocalTable, node()}]) + catch + _:_ -> + _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), + {error, not_connected} + end; pang -> _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), {error, not_connected} @@ -277,7 +282,11 @@ maybe_rebalance( } = State ) -> NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), - NewBalancingTable = mg_core_cluster_partitions:make_balancing_table(NewPartitionsTable, PartitionsOpts), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table( + partition_based, + NewPartitionsTable, + PartitionsOpts + ), State#{partitions_table => NewPartitionsTable, balancing_table => NewBalancingTable}; maybe_rebalance(_, State) -> State. diff --git a/apps/mg_core/src/mg_core_cluster_partitions.erl b/apps/mg_core/src/mg_core_cluster_partitions.erl index 1cb0c39..1ed6aa7 100644 --- a/apps/mg_core/src/mg_core_cluster_partitions.erl +++ b/apps/mg_core/src/mg_core_cluster_partitions.erl @@ -33,28 +33,14 @@ %% API -export([discovery/1]). -export([make_local_table/1]). --export([make_balancing_table/2]). +-export([make_balancing_table/3]). -export([add_partitions/2]). -export([del_partition/2]). -export([empty_partitions/0]). -export([get_node/2]). -export([is_local_partition/2]). --ifdef(TEST). --export([get_addrs/1]). --export([addrs_to_nodes/2]). --export([host_to_index/1]). --define(TEST_NODES, [ - 'test_node@127.0.0.1', - 'peer@127.0.0.1' -]). --endif. - -spec discovery(discovery_options()) -> {ok, [node()]}. --ifdef(TEST). -discovery(_) -> - {ok, ?TEST_NODES}. --else. discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> case get_addrs(unicode:characters_to_list(DomainName)) of {ok, ListAddrs} -> @@ -63,7 +49,6 @@ discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> Error -> error({resolve_error, Error}) end. --endif. -spec make_local_table(mg_core_cluster:scaling_type()) -> local_partition_table(). make_local_table(global_based) -> @@ -73,10 +58,11 @@ make_local_table(partition_based) -> {ok, HostIndex} = host_to_index(Hostname), #{HostIndex => node()}. --spec make_balancing_table(partitions_table(), partitions_options() | undefined) -> balancing_table(). -make_balancing_table(_PartitionsTable, undefined) -> +-spec make_balancing_table(mg_core_cluster:scaling_type(), partitions_table(), partitions_options() | undefined) -> + balancing_table(). +make_balancing_table(global_based, _PartitionsTable, _) -> #{}; -make_balancing_table(PartitionsTable, #{capacity := Capacity, max_hash := MaxHash}) -> +make_balancing_table(partition_based, PartitionsTable, #{capacity := Capacity, max_hash := MaxHash}) -> ListPartitions = maps:keys(PartitionsTable), mg_core_dirange:get_ranges(MaxHash, Capacity, ListPartitions). diff --git a/apps/mg_core/src/mg_core_storage.erl b/apps/mg_core/src/mg_core_storage.erl index 698e60d..2b671d1 100644 --- a/apps/mg_core/src/mg_core_storage.erl +++ b/apps/mg_core/src/mg_core_storage.erl @@ -295,8 +295,8 @@ filter_by_partition({NS, _, _}, Data) when is_list(Data) -> end, Data ); -filter_by_partition({_NS, _, _} = Name, {Data, _Continuation}) -> - filter_by_partition(Name, Data); +filter_by_partition({_NS, _, _} = Name, {Data, Continuation}) -> + {filter_by_partition(Name, Data), Continuation}; filter_by_partition(_Name, Data) -> Data. diff --git a/apps/mg_core/test/mg_core_cluster_SUITE.erl b/apps/mg_core/test/mg_core_cluster_SUITE.erl index 22a87a5..8f56a48 100644 --- a/apps/mg_core/test/mg_core_cluster_SUITE.erl +++ b/apps/mg_core/test/mg_core_cluster_SUITE.erl @@ -14,8 +14,8 @@ -define(RECONNECT_TIMEOUT, 2000). -define(CLUSTER_OPTS, #{ discovering => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> }, scaling => partition_based, partitioning => #{ @@ -24,9 +24,7 @@ }, reconnect_timeout => ?RECONNECT_TIMEOUT }). --define(NEIGHBOUR, 'peer@127.0.0.1'). --export([peer_test/1]). -export([base_test/1]). -export([reconnect_rebalance_test/1]). -export([connecting_rebalance_test/1]). @@ -38,7 +36,7 @@ -type group_name() :: atom(). -type test_result() :: any() | no_return(). --define(PARTITIONS_INFO_WITH_PEER, #{ +-define(PARTITIONS_INFO_WITH_PEER(Local, Remote), #{ partitioning => #{ capacity => 5, max_hash => 4095 @@ -54,14 +52,14 @@ {3685, 4095} => 1 }, local_table => #{ - 0 => 'test_node@127.0.0.1' + 0 => Local }, partitions_table => #{ - 0 => 'test_node@127.0.0.1', - 1 => 'peer@127.0.0.1' + 0 => Local, + 1 => Remote } }). --define(PARTITIONS_INFO_WO_PEER, #{ +-define(PARTITIONS_INFO_WO_PEER(Local), #{ partitioning => #{ capacity => 5, max_hash => 4095 @@ -74,26 +72,69 @@ {3276, 4095} => 0 }, local_table => #{ - 0 => 'test_node@127.0.0.1' + 0 => Local }, partitions_table => #{ - 0 => 'test_node@127.0.0.1' + 0 => Local } }). %% erlang:phash2({<<"Namespace">>, <<"ID">>}, 4095) = 1075 -define(KEY, {<<"Namespace">>, <<"ID">>}). +-define(ERLANG_TEST_HOSTS, [ + "mg-0", + "mg-1", + "mg-2", + "mg-3", + "mg-4" +]). + +-define(HOSTS_TEMPLATE, << + "127.0.0.1 localhost\n", + "::1 localhost ip6-localhost ip6-loopback\n", + "fe00::0 ip6-localnet\n", + "ff00::0 ip6-mcastprefix\n", + "ff02::1 ip6-allnodes\n", + "ff02::2 ip6-allrouters\n" +>>). + +-define(LOCAL_NODE(Config), begin + {local_node, LocalNode} = lists:keyfind(local_node, 1, Config), + LocalNode +end). + +-define(REMOTE_NODE(Config), begin + {remote_node, RemoteNode} = lists:keyfind(remote_node, 1, Config), + RemoteNode +end). + -spec init_per_suite(_) -> _. init_per_suite(Config) -> - WorkDir = os:getenv("WORK_DIR"), - EbinDir = WorkDir ++ "/_build/test/lib/mg_cth/ebin", - ok = start_peer(), - true = erpc:call(?NEIGHBOUR, code, add_path, [EbinDir]), - {ok, _Pid} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, start, []), - Config. + HostsTable = lists:foldl( + fun(Host, Acc) -> + {ok, Addr} = inet:getaddr(Host, inet), + Acc#{unicode:characters_to_binary(Host) => unicode:characters_to_binary(inet:ntoa(Addr))} + end, + #{}, + ?ERLANG_TEST_HOSTS + ), + _ = prepare_cluster(HostsTable, [<<"mg-0">>, <<"mg-1">>]), + _ = instance_up(<<"mg-1">>), + LocalAddr = maps:get(<<"mg-0">>, HostsTable), + LocalNode = erlang:binary_to_atom(<<"mg@", LocalAddr/binary>>), + RemoteAddr = maps:get(<<"mg-1">>, HostsTable), + RemoteNode = erlang:binary_to_atom(<<"mg@", RemoteAddr/binary>>), + _ = await_peer(RemoteNode, 5), + [ + {local_node, LocalNode}, + {remote_node, RemoteNode}, + {hosts_table, HostsTable} + | Config + ]. -spec end_per_suite(_) -> _. end_per_suite(_Config) -> + _ = instance_down(<<"mg-1">>), ok. -spec test() -> _. @@ -106,7 +147,6 @@ all() -> groups() -> [ {basic_operations, [], [ - peer_test, base_test, reconnect_rebalance_test, connecting_rebalance_test, @@ -115,76 +155,183 @@ groups() -> ]} ]. --spec peer_test(config()) -> test_result(). -peer_test(_Config) -> - {ok, 'ECHO'} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, echo, ['ECHO']), - {ok, #{1 := 'peer@127.0.0.1'}} = erpc:call(?NEIGHBOUR, mg_cth_neighbour, connecting, [{#{}, node()}]). - -spec base_test(config()) -> test_result(). -base_test(_Config) -> +base_test(Config) -> {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), ?assertEqual(2, mg_core_cluster:cluster_size()), - ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), - ?assertEqual(false, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WITH_PEER)), - ?assertEqual(true, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER)), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + false, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)) + ) + ), + ?assertEqual( + true, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)) + ) + ), exit(Pid, normal). -spec reconnect_rebalance_test(config()) -> test_result(). -reconnect_rebalance_test(_Config) -> +reconnect_rebalance_test(Config) -> %% node_down - rebalance - reconnect by timer - rebalance {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), - Pid ! {nodedown, ?NEIGHBOUR}, - ?assertEqual(?PARTITIONS_INFO_WO_PEER, mg_core_cluster:get_partitions_info()), + Pid ! {nodedown, ?REMOTE_NODE(Config)}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)), mg_core_cluster:get_partitions_info()), ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), - ?assertEqual(true, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER)), + ?assertEqual( + true, + mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config))) + ), %% wait reconnecting timer:sleep(?RECONNECT_TIMEOUT + 10), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), - ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), - ?assertEqual(false, mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WITH_PEER)), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + false, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)) + ) + ), exit(Pid, normal). -spec connecting_rebalance_test(config()) -> test_result(). -connecting_rebalance_test(_Config) -> +connecting_rebalance_test(Config) -> {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), - Pid ! {nodedown, ?NEIGHBOUR}, - ?assertEqual(?PARTITIONS_INFO_WO_PEER, mg_core_cluster:get_partitions_info()), + Pid ! {nodedown, ?REMOTE_NODE(Config)}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)), mg_core_cluster:get_partitions_info()), ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), %% force connecting - ?assertEqual({ok, #{0 => node()}}, mg_core_cluster:connecting({#{1 => ?NEIGHBOUR}, ?NEIGHBOUR})), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), - ?assertEqual({ok, ?NEIGHBOUR}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + {ok, #{0 => node()}}, + mg_core_cluster:connecting({#{1 => ?REMOTE_NODE(Config)}, ?REMOTE_NODE(Config)}) + ), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), exit(Pid, normal). -spec double_connecting_test(config()) -> test_result(). -double_connecting_test(_Config) -> +double_connecting_test(Config) -> {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), %% double connect - ?assertEqual({ok, #{0 => node()}}, mg_core_cluster:connecting({#{1 => ?NEIGHBOUR}, ?NEIGHBOUR})), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual( + {ok, #{0 => node()}}, + mg_core_cluster:connecting({#{1 => ?REMOTE_NODE(Config)}, ?REMOTE_NODE(Config)}) + ), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), exit(Pid, normal). -spec deleted_node_down_test(config()) -> test_result(). -deleted_node_down_test(_Config) -> +deleted_node_down_test(Config) -> {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), Pid ! {nodedown, 'foo@127.0.0.1'}, - %% wait reconnect timeout - ?assertEqual(?PARTITIONS_INFO_WITH_PEER, mg_core_cluster:get_partitions_info()), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), exit(Pid, normal). %% Internal functions --spec start_peer() -> _. -start_peer() -> - {ok, _Pid, _Node} = peer:start(#{ - name => peer, - longnames => true, - host => "127.0.0.1" - }), - pong = net_adm:ping(?NEIGHBOUR), - ok. +-spec prepare_cluster(_, _) -> _. +prepare_cluster(HostsTable, HostsToUp) -> + %% prepare headless emulation records + HeadlessRecords = lists:foldl( + fun(Host, Acc) -> + Address = maps:get(Host, HostsTable), + <> + end, + <<"\n">>, + HostsToUp + ), + + %% prepare hosts files for each node + lists:foreach( + fun(Host) -> + Address = maps:get(Host, HostsTable), + Payload = << + ?HOSTS_TEMPLATE/binary, + Address/binary, + " ", + Host/binary, + "\n", + HeadlessRecords/binary + >>, + Filename = unicode:characters_to_list(Host) ++ "_hosts", + ok = file:write_file(Filename, Payload) + end, + HostsToUp + ), + + %% distribute hosts files + lists:foreach( + fun + (<<"mg-0">>) -> + LocalFile = "mg-0_hosts", + RemoteFile = "/etc/hosts", + Cp = os:find_executable("cp"), + CMD = Cp ++ " -f " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD); + (Host) -> + HostString = unicode:characters_to_list(Host), + LocalFile = HostString ++ "_hosts", + RemoteFile = HostString ++ ":/etc/hosts", + SshPass = os:find_executable("sshpass"), + Scp = os:find_executable("scp"), + CMD = SshPass ++ " -p security " ++ Scp ++ " " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD) + end, + HostsToUp + ). + +-spec instance_up(_) -> _. +instance_up(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/entrypoint.sh", + spawn(fun() -> os:cmd(CMD) end). + +-spec instance_down(_) -> _. +instance_down(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/mg stop", + spawn(fun() -> os:cmd(CMD) end). + +-spec await_peer(_, _) -> _. +await_peer(_RemoteNode, 0) -> + error(peer_not_started); +await_peer(RemoteNode, Attempt) -> + case net_adm:ping(RemoteNode) of + pong -> + ok; + pang -> + timer:sleep(1000), + await_peer(RemoteNode, Attempt - 1) + end. diff --git a/compose.yaml b/compose.yaml index be43199..907d8ba 100644 --- a/compose.yaml +++ b/compose.yaml @@ -35,10 +35,45 @@ services: condition: service_started consul2: condition: service_started + + distrunner1: + condition: service_started + distrunner2: + condition: service_started + distrunner3: + condition: service_started + distrunner4: + condition: service_started + ports: - "8022" command: /sbin/init + distrunner1: &mg-cluster + image: distrunner-test + # scale: 2 + hostname: ${SERVICE_NAME}-1 + build: + dockerfile: Dockerfile.test + context: . + args: + OTP_VERSION: $OTP_VERSION + THRIFT_VERSION: $THRIFT_VERSION + cap_add: + - NET_ADMIN + + distrunner2: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-2 + + distrunner3: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-3 + + distrunner4: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-4 + riakdb: &member-node image: docker.io/basho/riak-kv:${RIAK_VERSION} environment: diff --git a/config/config.example.yaml b/config/config.example.yaml new file mode 100644 index 0000000..068675e --- /dev/null +++ b/config/config.example.yaml @@ -0,0 +1,398 @@ +# +# Copyright 2020 RBKmoney +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +### This is one and only file to configure machinegun service. +### +### Configuration is specific for a single instance of machinegun, it's up to +### you to ensure that each instance is configured properly. This usually boils +### down to sharing namespaces configuration between instances and assigning +### proper nodenames so that instances could see and communicate with each +### other over Erlang distribution. +### +### If you find some configuration knobs are missing here do not hesitate to +### send a PR. +### +### Notes: +### +### * Configuration string support environment variable interpolation. +### +### The syntax is `${VARNAME}` where `VARNAME` is the name of referenced +### environment variable. If referenced the variable MUST be defined, +### otherwise it's a configuration error. There is no special handling of +### variable values: empty variable for example will be interpolated as +### empty string. +### +### For example, given `HOST_IP=10.0.42.42` one could define: +### dist_node_name: machinegun@${HOST_IP} +### +### * Graceful shutdowns. +### +### There are multiple `shutdown_timeout` parameters defined in this example config: +### * one for the woody server +### * one for each machinegun namespace +### * and two for consuela presence and registry +### To calculate the actual maximum time this service takes to shut down gracefully +### you need to take the woody server `shutdown_timeout` parameter, add the maximum +### value between all of the `shutdown_timeout` parameters defined for each namespace, +### and then add the both of the consuela `shutdown_timeout`s: +### +### max_shutdown_time = +### woody_server.shutdown_timeout + +### max(namespaces[].shutdown_timeout) + +### consuela.presence.shutdown_timeout + +### consuela.registry.shutdown_timeout +### +### + +# Name of the service. +# +# Defaults to: 'machinegun'. +service_name: machinegun + +# Name of the node for Erlang distribution. +# +# Defaults to: '{service_name}@{hostname}'. +# Examples: +# * with short node name: +# dist_node_name: machinegun +# * with fixed ip for host part: +# dist_node_name: machinegun@10.0.0.42 +# * for `machinegun@{primary_netif_ip}`, with latter determined at start time: +# dist_node_name: +# hostpart: ip +# * for `blarg@{fqdn}`, if latter available at start time: +# dist_node_name: +# namepart: blarg +# hostpart: fqdn +dist_node_name: + hostpart: hostname + +# Mode and allocation of ports for Erlang distribution. +# +# No defaults here, default behaviour is dictated by ERTS. +# Examples: +# * disable EMPD altogether and set a fixed port both for listening and +# communicating with remote nodes: +# dist_port: +# mode: static +# port: 31337 +dist_port: + mode: epmd + # Which ports to pick from when setting up a distribution listener? + range: [31337, 31340] + +# Erlang VM options. +erlang: + # Path to a file which holds Erlang distribution cookie. + # The cookie is _sensitive_ piece of information so handle it with caution. + # + # Must be set, there's no default. + secret_cookie_file: "config/cookie" + ipv6: true + disable_dns_cache: false + +# Opentelemetry settings +# By default opentelemetry is disabled which is equivalent to +# "opentelemetry: disabled" +opentelemetry: + # TODO Describe sampling + # Name of the service to use in recording machinegun's spans + service_name: machinegun + # For now spans processed always in batches. + # We support only "otlp" traces exporter + exporter: + # Supports only "http/protobuf" or "grpc" + protocol: http/protobuf + endpoint: http://jaeger:4318 + +# API server options. +woody_server: + ip: "::" + port: 8022 + http_keep_alive_timeout: 60s + shutdown_timeout: 0s # woody server shutdown timeout (see notes above) + +# Distributed machine registry settings. +# +# Do not set if you plan to run machinegun in a non-distributed fashion, +# for example in a development or testing scenarios. +consuela: + presence: + check_interval: 5s + shutdown_timeout: 5s + tags: + - production + registry: + nodename: mhost1 + session_ttl: 30s + session_renewal_interval: 10s + shutdown_timeout: 5s + discovery: {} + +# Consul client settings. +# +# Required when distributed machine registry is enabled. +consul: + url: http://localhost:8500 + acl_token_file: config/consul.token + connect_timeout: 200ms + recv_timeout: 1s + +# New cluster assembler (instead consuela) +# will be used if consuela undefined +# if cluster undefined then standalone mode +cluster: + discovery: + type: dns + options: + # hostname that will be resolved + domain_name: machinegun-headless + # name that will be used for construct full nodename (for example name@127.0.0.1) + sname: machinegun + # optional, supported values: global_based | partition_based, default global_based + scaling: partition_based + # required when scaling = partition_based + partitioning: + capacity: 31 + max_hash: 4095 + # optional, default value 5000 ms + reconnect_timeout: 5000 + +# if undefined will de used {mg_core_procreg_consuela, #{pulse => pulse(YamlConfig)}} +# if consuela undefined will be used mg_core_procreg_gproc +process_registry: + module: mg_core_procreg_global + +limits: + process_heap: 2M # heap limit + disk: # uses only for health check + path: "/" + value: 99% + memory: # return 503 if breaks + type: cgroups # cgroups | total + value: 90% + scheduler_tasks: 5000 +logging: + root: /var/log/mg + burst_limit_enable: false + sync_mode_qlen: 100 + drop_mode_qlen: 1000 + flush_qlen: 2000 + json_log: log.json + level: info + formatter: + max_length: 1000 + max_printable_string_length: 80 + level_map: + 'emergency': 'ERROR' + 'alert': 'ERROR' + 'critical': 'ERROR' + 'error': 'ERROR' + 'warning': 'WARN' + 'notice': 'INFO' + 'info': 'INFO' + 'debug': 'DEBUG' + +namespaces: + mg_test_ns: + # only for testing, default 0 + # suicide_probability: 0.1 + event_sinks: + machine: + type: machine + machine_id: main_event_sink + kafka: + type: kafka + client: default_kafka_client + topic: mg_test_ns + default_processing_timeout: 30s + timer_processing_timeout: 60s + reschedule_timeout: 60s + hibernate_timeout: 5s + shutdown_timeout: 1s # worker shutdown timeout (see notes above) + unload_timeout: 60s + processor: + url: http://localhost:8022/processor + pool_size: 50 + http_keep_alive_timeout: 10s + timers: + scan_interval: 1m + scan_limit: 1000 + capacity: 500 + min_scan_delay: 10s + overseer: disabled + notification: + capacity: 1000 + # search for new notification tasks in storage every x + scan_interval: 1m + # if the search had a continuation, read the continuation after x amount of time + min_scan_delay: 1s + # only search for notification tasks that are older than x + scan_handicap: 10s + # only search for notification tasks that are younger than x + scan_cutoff: 4W + # reschedule notification deliveries that failed with temporary errors x amount of time into the future + reschedule_time: 5s + # maximum number of events that will be stored inside of machine state + # must be non negative integer, default is 0 + event_stash_size: 5 + modernizer: + current_format_version: 1 + handler: + url: http://localhost:8022/modernizer + pool_size: 50 + http_keep_alive_timeout: 10s +snowflake_machine_id: 1 +# memory storage backend +# storage: +# type: memory +# riak storage backend +storage: + type: riak + host: riak-mg + port: 8078 + pool: + size: 100 + queue_max: 1000 + connect_timeout: 5s + request_timeout: 10s + index_query_timeout: 10s + batch_concurrency_limit: 50 +# Docs on what these options do +# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/developing/usage/replication +# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/learn/concepts/eventual-consistency/ + r_options: + r: quorum + pr: quorum + sloppy_quorum: false + w_options: + w: 4 + pw: 4 + dw: 4 + sloppy_quorum: false + d_options: + sloppy_quorum: false + +## kafka settings example +kafka: + default_kafka_client: + endpoints: + - host: "kafka1" + port: 9092 + - host: "kafka2" + port: 9092 + - host: "kafka3" + port: 9092 + ssl: + certfile: "client.crt" + keyfile: "client.key" + cacertfile: "ca.crt" + sasl: + mechanism: scram_sha_512 # Available: scram_sha_512, scram_sha_265, plain + # *Either* specify the `file` field or `username` and `password` fields. + # `file` is the path to a text file which contains two lines, + # first line for username and second line for password. + # Presence of the `file` field will override the presence of + # `username` and `password` fields (there is no fallback). + file: secret.txt + # ** OR ** + username: root + password: qwerty + producer: + compression: no_compression # 'gzip' or 'snappy' to enable compression + # How many message sets (per-partition) can be sent to kafka broker + # asynchronously before receiving ACKs from broker. + partition_onwire_limit: 1 + # Maximum time the broker can await the receipt of the + # number of acknowledgements in RequiredAcks. The timeout is not an exact + # limit on the request time for a few reasons: (1) it does not include + # network latency, (2) the timer begins at the beginning of the processing + # of this request so if many requests are queued due to broker overload + # that wait time will not be included, (3) kafka leader will not terminate + # a local write so if the local write time exceeds this timeout it will + # not be respected. + ack_timeout: 10s + # How many acknowledgements the kafka broker should receive from the + # clustered replicas before acking producer. + # none: the broker will not send any response + # (this is the only case where the broker will not reply to a request) + # leader_only: The leader will wait the data is written to the local log before + # sending a response. + # all_isr: If it is 'all_isr' the broker will block until the message is committed by + # all in sync replicas before acking. + required_acks: all_isr + # How many requests (per-partition) can be buffered without blocking the + # caller. The callers are released (by receiving the + # 'brod_produce_req_buffered' reply) once the request is taken into buffer + # and after the request has been put on wire, then the caller may expect + # a reply 'brod_produce_req_acked' when the request is accepted by kafka. + partition_buffer_limit: 256 + # Messages are allowed to 'linger' in buffer for this amount of + # time before being sent. + # Definition of 'linger': A message is in 'linger' state when it is allowed + # to be sent on-wire, but chosen not to (for better batching). + max_linger: 0ms + # At most this amount (count not size) of messages are allowed to 'linger' + # in buffer. Messages will be sent regardless of 'linger' age when this + # threshold is hit. + # NOTE: It does not make sense to have this value set larger than + # `partition_buffer_limit' + max_linger_count: 0 + # In case callers are producing faster than brokers can handle (or + # congestion on wire), try to accumulate small requests into batches + # as much as possible but not exceeding max_batch_size. + # OBS: If compression is enabled, care should be taken when picking + # the max batch size, because a compressed batch will be produced + # as one message and this message might be larger than + # 'max.message.bytes' in kafka config (or topic config) + max_batch_size: 1M + # If {max_retries, N} is given, the producer retry produce request for + # N times before crashing in case of failures like connection being + # shutdown by remote or exceptions received in produce response from kafka. + # The special value N = -1 means 'retry indefinitely' + max_retries: 3 + # Time in milli-seconds to sleep before retry the failed produce request. + retry_backoff: 500ms + +## +## a short example for HG +## +# service_name: machinegun +# namespaces: +# invoice: +# event_sink: payproc +# processor: +# url: http://hellgate:8022/v1/stateproc/invoice +# party: +# event_sink: payproc +# processor: +# url: http://hellgate:8022/v1/stateproc/party +# domain-config: +# processor: +# url: http://dominant:8022/v1/stateproc/party +# storage: +# type: memory + +## +## a minimal config +## +# service_name: machinegun +# namespaces: +# mg_test_ns: +# processor: +# url: http://localhost:8022/processor +# storage: +# type: memory diff --git a/config/config.yaml b/config/config.yaml index 068675e..5c3f103 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,292 +1,113 @@ -# -# Copyright 2020 RBKmoney -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +service_name: mg -### This is one and only file to configure machinegun service. -### -### Configuration is specific for a single instance of machinegun, it's up to -### you to ensure that each instance is configured properly. This usually boils -### down to sharing namespaces configuration between instances and assigning -### proper nodenames so that instances could see and communicate with each -### other over Erlang distribution. -### -### If you find some configuration knobs are missing here do not hesitate to -### send a PR. -### -### Notes: -### -### * Configuration string support environment variable interpolation. -### -### The syntax is `${VARNAME}` where `VARNAME` is the name of referenced -### environment variable. If referenced the variable MUST be defined, -### otherwise it's a configuration error. There is no special handling of -### variable values: empty variable for example will be interpolated as -### empty string. -### -### For example, given `HOST_IP=10.0.42.42` one could define: -### dist_node_name: machinegun@${HOST_IP} -### -### * Graceful shutdowns. -### -### There are multiple `shutdown_timeout` parameters defined in this example config: -### * one for the woody server -### * one for each machinegun namespace -### * and two for consuela presence and registry -### To calculate the actual maximum time this service takes to shut down gracefully -### you need to take the woody server `shutdown_timeout` parameter, add the maximum -### value between all of the `shutdown_timeout` parameters defined for each namespace, -### and then add the both of the consuela `shutdown_timeout`s: -### -### max_shutdown_time = -### woody_server.shutdown_timeout + -### max(namespaces[].shutdown_timeout) + -### consuela.presence.shutdown_timeout + -### consuela.registry.shutdown_timeout -### -### +#dist_node_name: mg@${POD_IP} +#dist_port: +# mode: static +# port: 31337 -# Name of the service. -# -# Defaults to: 'machinegun'. -service_name: machinegun - -# Name of the node for Erlang distribution. -# -# Defaults to: '{service_name}@{hostname}'. -# Examples: -# * with short node name: -# dist_node_name: machinegun -# * with fixed ip for host part: -# dist_node_name: machinegun@10.0.0.42 -# * for `machinegun@{primary_netif_ip}`, with latter determined at start time: -# dist_node_name: -# hostpart: ip -# * for `blarg@{fqdn}`, if latter available at start time: -# dist_node_name: -# namepart: blarg -# hostpart: fqdn -dist_node_name: - hostpart: hostname - -# Mode and allocation of ports for Erlang distribution. -# -# No defaults here, default behaviour is dictated by ERTS. -# Examples: -# * disable EMPD altogether and set a fixed port both for listening and -# communicating with remote nodes: -# dist_port: -# mode: static -# port: 31337 -dist_port: - mode: epmd - # Which ports to pick from when setting up a distribution listener? - range: [31337, 31340] - -# Erlang VM options. erlang: - # Path to a file which holds Erlang distribution cookie. - # The cookie is _sensitive_ piece of information so handle it with caution. - # - # Must be set, there's no default. - secret_cookie_file: "config/cookie" - ipv6: true - disable_dns_cache: false + ipv6: false + secret_cookie_file: /opt/mg/etc/cookie -# Opentelemetry settings -# By default opentelemetry is disabled which is equivalent to -# "opentelemetry: disabled" -opentelemetry: - # TODO Describe sampling - # Name of the service to use in recording machinegun's spans - service_name: machinegun - # For now spans processed always in batches. - # We support only "otlp" traces exporter - exporter: - # Supports only "http/protobuf" or "grpc" - protocol: http/protobuf - endpoint: http://jaeger:4318 +logging: + out_type: stdout + level: warning + formatter: + level_map: + 'emergency': 'ERROR' + 'alert': 'ERROR' + 'critical': 'ERROR' + 'error': 'ERROR' + 'warning': 'WARN' + 'notice': 'INFO' + 'info': 'INFO' + 'debug': 'DEBUG' -# API server options. woody_server: ip: "::" port: 8022 - http_keep_alive_timeout: 60s - shutdown_timeout: 0s # woody server shutdown timeout (see notes above) + max_concurrent_connections: 8000 + http_keep_alive_timeout: 3000ms + shutdown_timeout: 5s -# Distributed machine registry settings. -# -# Do not set if you plan to run machinegun in a non-distributed fashion, -# for example in a development or testing scenarios. -consuela: - presence: - check_interval: 5s - shutdown_timeout: 5s - tags: - - production - registry: - nodename: mhost1 - session_ttl: 30s - session_renewal_interval: 10s - shutdown_timeout: 5s - discovery: {} +storage: + type: riak + host: riakdb + port: 8087 + pool: + size: 100 + queue_max: 500 + batch_concurrency_limit: 10 + connect_timeout: 500ms + request_timeout: 10s + index_query_timeout: 60s + +worker: + message_queue_len_limit: 1000 -# Consul client settings. -# -# Required when distributed machine registry is enabled. -consul: - url: http://localhost:8500 - acl_token_file: config/consul.token - connect_timeout: 200ms - recv_timeout: 1s +process_registry: + module: mg_core_procreg_global -# New cluster assembler (instead consuela) -# will be used if consuela undefined -# if cluster undefined then standalone mode cluster: discovery: type: dns options: - # hostname that will be resolved - domain_name: machinegun-headless - # name that will be used for construct full nodename (for example name@127.0.0.1) - sname: machinegun - # optional, supported values: global_based | partition_based, default global_based + domain_name: machinegun-ha-headless + sname: mg scaling: partition_based - # required when scaling = partition_based partitioning: - capacity: 31 + capacity: 5 max_hash: 4095 - # optional, default value 5000 ms reconnect_timeout: 5000 -# if undefined will de used {mg_core_procreg_consuela, #{pulse => pulse(YamlConfig)}} -# if consuela undefined will be used mg_core_procreg_gproc -process_registry: - module: mg_core_procreg_global - -limits: - process_heap: 2M # heap limit - disk: # uses only for health check - path: "/" - value: 99% - memory: # return 503 if breaks - type: cgroups # cgroups | total - value: 90% - scheduler_tasks: 5000 -logging: - root: /var/log/mg - burst_limit_enable: false - sync_mode_qlen: 100 - drop_mode_qlen: 1000 - flush_qlen: 2000 - json_log: log.json - level: info - formatter: - max_length: 1000 - max_printable_string_length: 80 - level_map: - 'emergency': 'ERROR' - 'alert': 'ERROR' - 'critical': 'ERROR' - 'error': 'ERROR' - 'warning': 'WARN' - 'notice': 'INFO' - 'info': 'INFO' - 'debug': 'DEBUG' - namespaces: - mg_test_ns: - # only for testing, default 0 - # suicide_probability: 0.1 + test_transaction: + timers: + scan_interval: 60s + scan_limit: 2000 + capacity: 3500 + min_scan_delay: 5s + overseer: + scan_interval: 60m + min_scan_delay: 5s + retries: + storage: + type: exponential + max_retries: infinity + factor: 2 + timeout: 10ms + max_timeout: 60s + timers: + type: exponential + max_retries: 100 + factor: 2 + timeout: 2s + max_timeout: 30m + processor: + type: exponential + max_retries: + max_total_timeout: 1d + factor: 2 + timeout: 10ms + max_timeout: 60s + continuation: + type: exponential + max_retries: infinity + factor: 2 + timeout: 10ms + max_timeout: 60s event_sinks: - machine: - type: machine - machine_id: main_event_sink kafka: type: kafka + topic: test_transaction client: default_kafka_client - topic: mg_test_ns - default_processing_timeout: 30s - timer_processing_timeout: 60s - reschedule_timeout: 60s - hibernate_timeout: 5s - shutdown_timeout: 1s # worker shutdown timeout (see notes above) - unload_timeout: 60s processor: - url: http://localhost:8022/processor - pool_size: 50 - http_keep_alive_timeout: 10s - timers: - scan_interval: 1m - scan_limit: 1000 - capacity: 500 - min_scan_delay: 10s - overseer: disabled - notification: - capacity: 1000 - # search for new notification tasks in storage every x - scan_interval: 1m - # if the search had a continuation, read the continuation after x amount of time - min_scan_delay: 1s - # only search for notification tasks that are older than x - scan_handicap: 10s - # only search for notification tasks that are younger than x - scan_cutoff: 4W - # reschedule notification deliveries that failed with temporary errors x amount of time into the future - reschedule_time: 5s - # maximum number of events that will be stored inside of machine state - # must be non negative integer, default is 0 - event_stash_size: 5 - modernizer: - current_format_version: 1 - handler: - url: http://localhost:8022/modernizer - pool_size: 50 - http_keep_alive_timeout: 10s -snowflake_machine_id: 1 -# memory storage backend -# storage: -# type: memory -# riak storage backend -storage: - type: riak - host: riak-mg - port: 8078 - pool: - size: 100 - queue_max: 1000 - connect_timeout: 5s - request_timeout: 10s - index_query_timeout: 10s - batch_concurrency_limit: 50 -# Docs on what these options do -# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/developing/usage/replication -# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/learn/concepts/eventual-consistency/ - r_options: - r: quorum - pr: quorum - sloppy_quorum: false - w_options: - w: 4 - pw: 4 - dw: 4 - sloppy_quorum: false - d_options: - sloppy_quorum: false + url: http://test-transaction2:8088/v1/processor + pool_size: 2000 + http_keep_alive_timeout: 3000ms + unload_timeout: 180s + shutdown_timeout: 5s -## kafka settings example kafka: default_kafka_client: endpoints: @@ -296,103 +117,14 @@ kafka: port: 9092 - host: "kafka3" port: 9092 - ssl: - certfile: "client.crt" - keyfile: "client.key" - cacertfile: "ca.crt" - sasl: - mechanism: scram_sha_512 # Available: scram_sha_512, scram_sha_265, plain - # *Either* specify the `file` field or `username` and `password` fields. - # `file` is the path to a text file which contains two lines, - # first line for username and second line for password. - # Presence of the `file` field will override the presence of - # `username` and `password` fields (there is no fallback). - file: secret.txt - # ** OR ** - username: root - password: qwerty producer: - compression: no_compression # 'gzip' or 'snappy' to enable compression - # How many message sets (per-partition) can be sent to kafka broker - # asynchronously before receiving ACKs from broker. + compression: no_compression partition_onwire_limit: 1 - # Maximum time the broker can await the receipt of the - # number of acknowledgements in RequiredAcks. The timeout is not an exact - # limit on the request time for a few reasons: (1) it does not include - # network latency, (2) the timer begins at the beginning of the processing - # of this request so if many requests are queued due to broker overload - # that wait time will not be included, (3) kafka leader will not terminate - # a local write so if the local write time exceeds this timeout it will - # not be respected. ack_timeout: 10s - # How many acknowledgements the kafka broker should receive from the - # clustered replicas before acking producer. - # none: the broker will not send any response - # (this is the only case where the broker will not reply to a request) - # leader_only: The leader will wait the data is written to the local log before - # sending a response. - # all_isr: If it is 'all_isr' the broker will block until the message is committed by - # all in sync replicas before acking. required_acks: all_isr - # How many requests (per-partition) can be buffered without blocking the - # caller. The callers are released (by receiving the - # 'brod_produce_req_buffered' reply) once the request is taken into buffer - # and after the request has been put on wire, then the caller may expect - # a reply 'brod_produce_req_acked' when the request is accepted by kafka. partition_buffer_limit: 256 - # Messages are allowed to 'linger' in buffer for this amount of - # time before being sent. - # Definition of 'linger': A message is in 'linger' state when it is allowed - # to be sent on-wire, but chosen not to (for better batching). max_linger: 0ms - # At most this amount (count not size) of messages are allowed to 'linger' - # in buffer. Messages will be sent regardless of 'linger' age when this - # threshold is hit. - # NOTE: It does not make sense to have this value set larger than - # `partition_buffer_limit' max_linger_count: 0 - # In case callers are producing faster than brokers can handle (or - # congestion on wire), try to accumulate small requests into batches - # as much as possible but not exceeding max_batch_size. - # OBS: If compression is enabled, care should be taken when picking - # the max batch size, because a compressed batch will be produced - # as one message and this message might be larger than - # 'max.message.bytes' in kafka config (or topic config) max_batch_size: 1M - # If {max_retries, N} is given, the producer retry produce request for - # N times before crashing in case of failures like connection being - # shutdown by remote or exceptions received in produce response from kafka. - # The special value N = -1 means 'retry indefinitely' max_retries: 3 - # Time in milli-seconds to sleep before retry the failed produce request. retry_backoff: 500ms - -## -## a short example for HG -## -# service_name: machinegun -# namespaces: -# invoice: -# event_sink: payproc -# processor: -# url: http://hellgate:8022/v1/stateproc/invoice -# party: -# event_sink: payproc -# processor: -# url: http://hellgate:8022/v1/stateproc/party -# domain-config: -# processor: -# url: http://dominant:8022/v1/stateproc/party -# storage: -# type: memory - -## -## a minimal config -## -# service_name: machinegun -# namespaces: -# mg_test_ns: -# processor: -# url: http://localhost:8022/processor -# storage: -# type: memory diff --git a/rebar.config b/rebar.config index b78969b..7186801 100644 --- a/rebar.config +++ b/rebar.config @@ -85,7 +85,8 @@ {overlay, [ {template, "rel_scripts/entrypoint.sh", "bin/entrypoint.sh"}, {copy, "rel_scripts/configurator.escript", "bin/configurator.escript"}, - {copy, "config/config.yaml", "etc/config.yaml"} + {copy, "config/config.yaml", "etc/config.yaml"}, + {copy, "config/cookie", "etc/cookie"} ]} ]} ]}, diff --git a/rebar.config.script b/rebar.config.script new file mode 100644 index 0000000..f294de0 --- /dev/null +++ b/rebar.config.script @@ -0,0 +1,23 @@ +case os:getenv("IP") of + false -> CONFIG; + IpString -> + Node = erlang:list_to_atom("mg@" ++ IpString), + {value, {profiles, Profiles0}, Config0} = lists:keytake(profiles, 1, CONFIG), + {value, {test, TestProfile0}, Profiles1} = lists:keytake(test, 1, Profiles0), + TestProfile = lists:keystore( + dist_node, + 1, + TestProfile0, + {dist_node, [ + {setcookie, 'HI-MARK!'}, + {name, Node} + ]} + ), + Profiles = lists:keystore( + test, + 1, + Profiles1, + {test, TestProfile} + ), + lists:keystore(profiles, 1, Config0, {profiles, Profiles}) +end. diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index c207d6b..3bdc21e 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -744,6 +744,7 @@ event_sink_ns(YamlConfig) -> #{ registry => procreg(YamlConfig), storage => storage(<<"_event_sinks">>, YamlConfig), + scaling => scaling(YamlConfig), worker => #{registry => procreg(YamlConfig)}, duplicate_search_batch => 1000, default_processing_timeout => ?C:milliseconds(<<"30s">>) diff --git a/test_resources/authorized_keys b/test_resources/authorized_keys new file mode 100644 index 0000000..36c6d89 --- /dev/null +++ b/test_resources/authorized_keys @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDRD4mOIKKi7qJ39h1Ll4ej8sQTVJW0r1XXiK/eigSjCln1+K2zEvDKr4UCKaLeQaObcbcPdYQFPSIsyOYbKoFi1I/VkiUnkwIn516/VRx+zeCM3D2BhC+c3ikGbQnYK/dToyOT4w641j0BtKmMf80Y2rIyHFcFlEPEoICM9SznvkqcFSSs/qs6fALffvtOnaZsVUVhF+elOOupOuYGVvaMANPOWeyj5GqZ+mX/6vEPLglGD6vrt17GjJL/iz2XHhI/gej+AcRQ0fdfQKXWeUPAeijqFSY65D5GqLIl5AhRqizXCh0lNkodSJ0A4u2J31mniC1jZoVBkmV3O3Kq1ODdExcDDs8JUIaVAxXdHD5MnQnY3Bnm/yfB6sb8oUg/FC74IRqrIuXmMUlNBvu1N6zA775cXR1wSPkXTSP4KeKyYcJIpyfAeZoNBed6oqn4nRag6FiGXsgm6Z1Cdz1yn3kCCIWFgYmjMhdw6O8ZXrDPNh2uS87ZpgGXHpfzcZtrqU0= root@mg-0 diff --git a/test_resources/ssh/config b/test_resources/ssh/config new file mode 100644 index 0000000..d6e34f5 --- /dev/null +++ b/test_resources/ssh/config @@ -0,0 +1 @@ +StrictHostKeyChecking off diff --git a/test_resources/ssh/id_rsa b/test_resources/ssh/id_rsa new file mode 100644 index 0000000..2a05530 --- /dev/null +++ b/test_resources/ssh/id_rsa @@ -0,0 +1,38 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn +NhAAAAAwEAAQAAAYEA0Q+JjiCiou6id/YdS5eHo/LEE1SVtK9V14iv3ooEowpZ9fitsxLw +yq+FAimi3kGjm3G3D3WEBT0iLMjmGyqBYtSP1ZIlJ5MCJ+dev1Ucfs3gjNw9gYQvnN4pBm +0J2Cv3U6Mjk+MOuNY9AbSpjH/NGNqyMhxXBZRDxKCAjPUs575KnBUkrP6rOnwC3377Tp2m +bFVFYRfnpTjrqTrmBlb2jADTzlnso+Rqmfpl/+rxDy4JRg+r67dexoyS/4s9lx4SP4Ho/g +HEUNH3X0Cl1nlDwHoo6hUmOuQ+RqiyJeQIUaos1wodJTZKHUidAOLtid9Zp4gtY2aFQZJl +dztyqtTg3RMXAw7PCVCGlQMV3Rw+TJ0J2NwZ5v8nwerG/KFIPxQu+CEaqyLl5jFJTQb7tT +eswO++XF0dcEj5F00j+CnismHCSKcnwHmaDQXneqKp+J0WoOhYhl7IJumdQnc9cp95AgiF +hYGJozIXcOjvGV6wzzYdrkvO2aYBlx6X83Gba6lNAAAFgNTjC3PU4wtzAAAAB3NzaC1yc2 +EAAAGBANEPiY4goqLuonf2HUuXh6PyxBNUlbSvVdeIr96KBKMKWfX4rbMS8MqvhQIpot5B +o5txtw91hAU9IizI5hsqgWLUj9WSJSeTAifnXr9VHH7N4IzcPYGEL5zeKQZtCdgr91OjI5 +PjDrjWPQG0qYx/zRjasjIcVwWUQ8SggIz1LOe+SpwVJKz+qzp8At9++06dpmxVRWEX56U4 +66k65gZW9owA085Z7KPkapn6Zf/q8Q8uCUYPq+u3XsaMkv+LPZceEj+B6P4BxFDR919Apd +Z5Q8B6KOoVJjrkPkaosiXkCFGqLNcKHSU2Sh1InQDi7YnfWaeILWNmhUGSZXc7cqrU4N0T +FwMOzwlQhpUDFd0cPkydCdjcGeb/J8HqxvyhSD8ULvghGqsi5eYxSU0G+7U3rMDvvlxdHX +BI+RdNI/gp4rJhwkinJ8B5mg0F53qiqfidFqDoWIZeyCbpnUJ3PXKfeQIIhYWBiaMyF3Do +7xlesM82Ha5LztmmAZcel/Nxm2upTQAAAAMBAAEAAAGAYOEUW3qgI2T2gSTaGoeT4dPELT +kLTvnZi9HZvgSzdWJ8odGlnNBwKV0BBCmLQfek+4nMzSsmDM9xoNNQXtJptwTNyqi48wfa +/eboLz4fwFtjbaM6FWTOM6F33XR2FWj6ahW1jPixf9I33yx7TZKD1rqxzSr44Kr+ZIYETE +3pi1LRfFcH8erqKmYBZtSPXLUNxDIXvpC3Vgd0na2fntx50BMqE/vz/1cAV26ECf4zy1cI +ESF+B/Onxdaq4CUEW50g/RqNEnUWjr7w5CJObNj4dEWBUxL8IRMxNJ5LmXwWj9rdi+Umj4 +Ei7k+kKp81hIZwIGyQca5wzQytkamTtoRoUFij2UfoHJN3nOeRgLwWKpEKLofwJIP1nLU/ +MK6gSl8SJRR0lxztoRMTPD0QGGtgOj8f0QK+2xkvAb6TtTkCZRqC0V1HouWINzZ7vAPkWg +CwH0ajsUHvSfDyzvF/rP5hxuM67J/KCzar6EwugpJeiZaEOc5C2rTb9yOptDrWQ715AAAA +wQDv7bwQ528rohf/Skpwa9zT9LlyHdLb7iW+wV8CMqe3f5mPrAth4yE5Wp8Kdp4gg/2yRI +pQZr276noLcWHViUHlUrKjgI+sCqbK8gVrLYiid2z4gCt8TmPojhALXXPmaiIuFma0U3jR +qFtoz93cS8pU3rI8KWdBPP4l/EeM0B+l/GyVQm+3smAZSPYwCezLX6kXLTxVMbOWtKSPj9 +RuWRdq8O7/bH9Trr2MHC+q5BrEnU51ddOm8gbxdhauXZX7a14AAADBAPDhtkXric90HUad +EQISy50W8im5zw5+9Mry2w4W4WWPoqcQbC396pQCK7Rus+7wi11JeODkmU6iYyCMyw5e3F +yxLSQ/MCEIMYOLSv8mq3LXhsnED9SKNMWIp5ypj1AqMF9a20P+5FUC5zNj/JvHSCUEzK7G +bTuUSHT8KM3sLxd1LU2hzFgNKLJhopD1+BatNy28V0IwZKX2NhAtcHLQgDMQ1Bqb4fO76r +By5UNP6sD4GxqnqsJnnZsIJbbr+dZUVwAAAMEA3i6NRpepUbpgMs9+YYenNtfHU7u6idDP +Suzh1XcA47IsIEYkTsUsectEwXEsIybfXFQbwCtZFHUE9zfoMjupl8HsJZ6rBlQQe9YuXW +GQKiTTMTK4lwh6m9wYrru2xFWQ2Ff6N99vc86sa1Tnas3Eo77kQePnCCXYT8vaLJQOJwvf +WHOFP/PpBysL/lRnDCjNw++KlOds9mdVmMlu32wfYy5eFgYvz7CDcrmEZJL4drCq/ire2A +JM42SLsas41cj7AAAACXJvb3RAbWctMAE= +-----END OPENSSH PRIVATE KEY----- diff --git a/test_resources/ssh/id_rsa.pub b/test_resources/ssh/id_rsa.pub new file mode 100644 index 0000000..36c6d89 --- /dev/null +++ b/test_resources/ssh/id_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDRD4mOIKKi7qJ39h1Ll4ej8sQTVJW0r1XXiK/eigSjCln1+K2zEvDKr4UCKaLeQaObcbcPdYQFPSIsyOYbKoFi1I/VkiUnkwIn516/VRx+zeCM3D2BhC+c3ikGbQnYK/dToyOT4w641j0BtKmMf80Y2rIyHFcFlEPEoICM9SznvkqcFSSs/qs6fALffvtOnaZsVUVhF+elOOupOuYGVvaMANPOWeyj5GqZ+mX/6vEPLglGD6vrt17GjJL/iz2XHhI/gej+AcRQ0fdfQKXWeUPAeijqFSY65D5GqLIl5AhRqizXCh0lNkodSJ0A4u2J31mniC1jZoVBkmV3O3Kq1ODdExcDDs8JUIaVAxXdHD5MnQnY3Bnm/yfB6sb8oUg/FC74IRqrIuXmMUlNBvu1N6zA775cXR1wSPkXTSP4KeKyYcJIpyfAeZoNBed6oqn4nRag6FiGXsgm6Z1Cdz1yn3kCCIWFgYmjMhdw6O8ZXrDPNh2uS87ZpgGXHpfzcZtrqU0= root@mg-0 From f1ffbb9fb517346037c2f5d63ceafe67e440a138 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 27 Mar 2024 10:31:39 +0300 Subject: [PATCH 2/4] TD-877: fix git workflows --- .github/workflows/erlang-checks.yml | 2 +- apps/mg_core/src/mg_core_cluster.erl | 18 ------------------ 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/.github/workflows/erlang-checks.yml b/.github/workflows/erlang-checks.yml index c1af1f9..81f5e84 100644 --- a/.github/workflows/erlang-checks.yml +++ b/.github/workflows/erlang-checks.yml @@ -29,7 +29,7 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.13 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.14 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} diff --git a/apps/mg_core/src/mg_core_cluster.erl b/apps/mg_core/src/mg_core_cluster.erl index bf6c2b0..16f863b 100644 --- a/apps/mg_core/src/mg_core_cluster.erl +++ b/apps/mg_core/src/mg_core_cluster.erl @@ -330,22 +330,4 @@ child_spec_test() -> ChildSpec = mg_core_cluster:child_spec(?CLUSTER_OPTS), ?assertEqual(ExpectedSpec, ChildSpec). --spec maybe_connect_fail_test() -> _. -maybe_connect_fail_test() -> - St = ?CLUSTER_OPTS#{ - local_table => #{0 => node()} - }, - Expected = #{ - discovering => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> - }, - known_nodes => ['test_node@127.0.0.1', 'peer@127.0.0.1'], - local_table => #{0 => 'nonode@nohost'}, - partitioning => #{capacity => 3, max_hash => 4095}, - reconnect_timeout => 5000, - scaling => partition_based - }, - ?assertEqual(Expected, maybe_connect('peer@127.0.0.1', St)). - -endif. From ecf506b7fcd27ae8c72cc4ed0ec3b46920d294f7 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 27 Mar 2024 10:53:40 +0300 Subject: [PATCH 3/4] TD-877: fix Dockerfile --- Dockerfile.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.test b/Dockerfile.test index 25c634f..1e879d5 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -8,7 +8,7 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"] ARG THRIFT_VERSION ARG TARGETARCH -RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${BUILDARCH}.tar.gz" \ +RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ | tar -xvz -C /usr/local/bin/ # Copy sources From 3cc131f182223f22b0de4a4759cbd1a49356fe24 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 2 Apr 2024 09:08:20 +0300 Subject: [PATCH 4/4] TD-877: add riak woody tetst --- apps/mg_core/src/mg_core_cluster.erl | 1 + apps/mg_cth/include/mg_cth.hrl | 1 + apps/mg_cth/src/mg_cth_cluster.erl | 120 ++++++++++ apps/mg_cth/src/mg_cth_configurator.erl | 12 +- apps/mg_woody/src/mg_woody_automaton.erl | 2 +- .../test/mg_modernizer_tests_SUITE.erl | 4 +- apps/mg_woody/test/mg_stress_SUITE.erl | 2 +- apps/mg_woody/test/mg_woody_tests_SUITE.erl | 213 ++++++++++++++++-- config/config.yaml | 6 +- elvis.config | 7 +- 10 files changed, 335 insertions(+), 33 deletions(-) create mode 100644 apps/mg_cth/src/mg_cth_cluster.erl diff --git a/apps/mg_core/src/mg_core_cluster.erl b/apps/mg_core/src/mg_core_cluster.erl index 16f863b..dd40773 100644 --- a/apps/mg_core/src/mg_core_cluster.erl +++ b/apps/mg_core/src/mg_core_cluster.erl @@ -56,6 +56,7 @@ -export_type([scaling_type/0]). -export_type([partitions_info/0]). +-export_type([cluster_options/0]). -spec child_spec(cluster_options()) -> [supervisor:child_spec()]. child_spec(#{discovering := _} = ClusterOpts) -> diff --git a/apps/mg_cth/include/mg_cth.hrl b/apps/mg_cth/include/mg_cth.hrl index 9f59eac..b1d998b 100644 --- a/apps/mg_cth/include/mg_cth.hrl +++ b/apps/mg_cth/include/mg_cth.hrl @@ -2,6 +2,7 @@ -define(__mg_cth__, 42). -define(NS, <<"NS">>). +-define(EVENT_SINK_NS, <<"_event_sinks">>). -define(ID, <<"ID">>). -define(EMPTY_ID, <<"">>). -define(ES_ID, <<"test_event_sink_2">>). diff --git a/apps/mg_cth/src/mg_cth_cluster.erl b/apps/mg_cth/src/mg_cth_cluster.erl new file mode 100644 index 0000000..20d1f93 --- /dev/null +++ b/apps/mg_cth/src/mg_cth_cluster.erl @@ -0,0 +1,120 @@ +-module(mg_cth_cluster). + +%% API +-export([ + prepare_cluster/1, + instance_up/1, + instance_down/1, + await_peer/2, + await_peer/1 +]). + +-define(ERLANG_TEST_HOSTS, [ + "mg-0", + "mg-1", + "mg-2", + "mg-3", + "mg-4" +]). + +-define(HOSTS_TEMPLATE, << + "127.0.0.1 localhost\n", + "::1 localhost ip6-localhost ip6-loopback\n", + "fe00::0 ip6-localnet\n", + "ff00::0 ip6-mcastprefix\n", + "ff02::1 ip6-allnodes\n", + "ff02::2 ip6-allrouters\n" +>>). + +-define(DEFAULT_ATTEMPTS, 5). + +-spec prepare_cluster(_) -> _. +prepare_cluster(HostsToUp) -> + HostsTable = hosts_table(), + %% prepare headless emulation records + HeadlessRecords = lists:foldl( + fun(Host, Acc) -> + Address = maps:get(Host, HostsTable), + <> + end, + <<"\n">>, + HostsToUp + ), + + %% prepare hosts files for each node + lists:foreach( + fun(Host) -> + Address = maps:get(Host, HostsTable), + Payload = << + ?HOSTS_TEMPLATE/binary, + Address/binary, + " ", + Host/binary, + "\n", + HeadlessRecords/binary + >>, + Filename = unicode:characters_to_list(Host) ++ "_hosts", + ok = file:write_file(Filename, Payload) + end, + HostsToUp + ), + + %% distribute hosts files + lists:foreach( + fun + (<<"mg-0">>) -> + LocalFile = "mg-0_hosts", + RemoteFile = "/etc/hosts", + Cp = os:find_executable("cp"), + CMD = Cp ++ " -f " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD); + (Host) -> + HostString = unicode:characters_to_list(Host), + LocalFile = HostString ++ "_hosts", + RemoteFile = HostString ++ ":/etc/hosts", + SshPass = os:find_executable("sshpass"), + Scp = os:find_executable("scp"), + CMD = SshPass ++ " -p security " ++ Scp ++ " " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD) + end, + HostsToUp + ). + +-spec instance_up(_) -> _. +instance_up(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/entrypoint.sh", + spawn(fun() -> os:cmd(CMD) end). + +-spec instance_down(_) -> _. +instance_down(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/mg stop", + spawn(fun() -> os:cmd(CMD) end). + +-spec await_peer(_) -> _. +await_peer(RemoteNode) -> + await_peer(RemoteNode, ?DEFAULT_ATTEMPTS). + +-spec await_peer(_, _) -> _. +await_peer(_RemoteNode, 0) -> + error(peer_not_started); +await_peer(RemoteNode, Attempt) -> + case net_adm:ping(RemoteNode) of + pong -> + ok; + pang -> + timer:sleep(1000), + await_peer(RemoteNode, Attempt - 1) + end. + +-spec hosts_table() -> _. +hosts_table() -> + lists:foldl( + fun(Host, Acc) -> + {ok, Addr} = inet:getaddr(Host, inet), + Acc#{unicode:characters_to_binary(Host) => unicode:characters_to_binary(inet:ntoa(Addr))} + end, + #{}, + ?ERLANG_TEST_HOSTS + ). diff --git a/apps/mg_cth/src/mg_cth_configurator.erl b/apps/mg_cth/src/mg_cth_configurator.erl index 356f362..7cdcae6 100644 --- a/apps/mg_cth/src/mg_cth_configurator.erl +++ b/apps/mg_cth/src/mg_cth_configurator.erl @@ -19,21 +19,25 @@ default_processing_timeout := timeout(), suicide_probability => mg_core_machine:suicide_probability(), event_stash_size := non_neg_integer(), - scaling => mg_core_cluster:scaling_type() + scaling => mg_core_cluster:scaling_type(), + _ => _ }. -type event_sink_ns() :: #{ default_processing_timeout := timeout(), storage => mg_core_storage:options(), worker => mg_core_worker:options(), - scaling => mg_core_cluster:scaling_type() + scaling => mg_core_cluster:scaling_type(), + _ => _ }. -type config() :: #{ woody_server := mg_woody:woody_server(), event_sink_ns := event_sink_ns(), namespaces := #{mg_core:ns() => events_machines()}, - quotas => [mg_core_quota_worker:options()] + quotas => [mg_core_quota_worker:options()], + cluster => mg_core_cluster:cluster_options(), + _ => _ }. -type processor() :: mg_woody_processor:options(). @@ -64,11 +68,13 @@ construct_child_specs( pulse => mg_cth_pulse } ), + ClusterSpec = mg_core_cluster:child_spec(ClusterOpts), lists:flatten([ EventSinkChSpec, WoodyServerChSpec, QuotasChSpec, + ClusterSpec, EventMachinesChSpec ]). diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index b3cf685..029a13d 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -111,7 +111,7 @@ handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> {ok, Reply} -> {ok, pack(repair_response, Reply)}; {error, {failed, Reason}} -> - %% TODO catch this in router!!! + %% TODO catch this in balancer!!! woody_error:raise(business, pack(repair_error, Reason)) end; handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> diff --git a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl index aaffed3..699e90c 100644 --- a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl @@ -168,7 +168,7 @@ mg_woody_config(Name, C) -> existing_storage_name => ?config(storage_name, C) }}, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, @@ -186,7 +186,7 @@ mg_woody_config(Name, C) -> #{ modernizer => #{ current_format_version => ?MODERN_FMT_VSN, - handler => #{url => <<"http://localhost:8023/modernizer">>} + handler => #{url => <<"http://mg-0:8023/modernizer">>} } } end diff --git a/apps/mg_woody/test/mg_stress_SUITE.erl b/apps/mg_woody/test/mg_stress_SUITE.erl index bdf7b10..f1de954 100644 --- a/apps/mg_woody/test/mg_stress_SUITE.erl +++ b/apps/mg_woody/test/mg_stress_SUITE.erl @@ -102,7 +102,7 @@ mg_woody_config(_C) -> ?NS => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, diff --git a/apps/mg_woody/test/mg_woody_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_tests_SUITE.erl index 8226b9c..e7188be 100644 --- a/apps/mg_woody/test/mg_woody_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_woody_tests_SUITE.erl @@ -80,6 +80,49 @@ -export([config_with_multiple_event_sinks/1]). +-define(CLUSTER_PARTITION_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + reconnect_timeout => 5000 +}). + +-define(CLUSTER_GLOBAL_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> + }, + scaling => global_based, + reconnect_timeout => 5000 +}). + +-define(STORAGE_RIAK(NS), + {mg_core_storage_riak, #{ + host => "riakdb", + port => 8087, + bucket => NS, + connect_timeout => 5000, + request_timeout => 5000, + index_query_timeout => 10000, + pool_options => #{ + init_count => 0, + max_count => 100, + idle_timeout => 60000, + cull_interval => 10000, + queue_max => 1000 + }, + batching => #{ + concurrency_limit => 100 + }, + sidecar => {mg_riak_prometheus, #{}} + }} +). %% %% tests descriptions %% @@ -90,18 +133,43 @@ -spec all() -> [test_name() | {group, group_name()}]. all() -> [ - {group, base}, - {group, history}, - {group, repair}, - {group, timers}, - {group, event_sink}, - {group, deadline}, + {group, standalone_memory}, + {group, standalone_memory_history}, + {group, standalone_riak}, + {group, distributed_riak}, config_with_multiple_event_sinks ]. -spec groups() -> [{group_name(), list(_), [test_name()]}]. groups() -> [ + {standalone_memory, [], [ + {group, base}, + {group, repair}, + {group, timers}, + {group, event_sink}, + {group, deadline} + ]}, + {standalone_memory_history, [], [ + {group, history} + ]}, + {standalone_riak, [], [ + {group, base}, + {group, history}, + {group, repair}, + {group, timers}, + {group, event_sink}, + {group, deadline} + ]}, + {distributed_riak, [], [ + {group, base}, + {group, history}, + {group, repair}, + {group, timers} + % {group, event_sink}, + % {group, deadline} + ]}, + % TODO проверить отмену таймера % TODO проверить отдельно get_history {base, [sequence], [ @@ -185,14 +253,60 @@ end_per_suite(_C) -> ok. -spec init_per_group(group_name(), config()) -> config(). -init_per_group(history, C) -> - init_per_group([{storage, mg_core_storage_memory} | C]); -init_per_group(_, C) -> +init_per_group(standalone_memory_history, C) -> + [ + {storage_ns, mg_core_storage_memory}, + {storage_evs, mg_core_storage_memory}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(standalone_memory, C) -> % NOTE % Даже такой небольшой шанс может сработать в ситуациях, когда мы в процессоре выгребаем % большой кусок истории машины, из-за чего реальная вероятность зафейлить операцию равна % (1 - (1 - p) ^ n). - init_per_group([{storage, {mg_core_storage_memory, #{random_transient_fail => 0.01}}} | C]). + [ + {storage_ns, {mg_core_storage_memory, #{random_transient_fail => 0.01}}}, + {storage_evs, mg_core_storage_memory}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(standalone_riak, C) -> + [ + {storage_ns, ?STORAGE_RIAK(?NS)}, + {storage_evs, ?STORAGE_RIAK(<<"_event_sinks">>)}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(distributed_riak, C) -> + ClusterHosts = [<<"mg-0">>, <<"mg-1">>, <<"mg-2">>], + _ = mg_cth_cluster:prepare_cluster([<<"mg-0">>, <<"mg-1">>, <<"mg-2">>]), + ok = lists:foreach( + fun(Host) -> mg_cth_cluster:instance_up(Host) end, + tl(ClusterHosts) + ), + %% wait peers + timer:sleep(3000), + [ + {storage_ns, ?STORAGE_RIAK(?NS)}, + {storage_evs, ?STORAGE_RIAK(<<"_event_sinks">>)}, + {scaling, partition_based}, + {worker, #{registry => mg_core_procreg_gproc}}, + {registry, mg_core_procreg_gproc}, + {cluster, ?CLUSTER_PARTITION_OPTS} + | C + ]; +init_per_group(_, C) -> + init_per_group(C). -spec init_per_group(config()) -> config(). init_per_group(C) -> @@ -221,11 +335,11 @@ init_per_group(C) -> [ {apps, Apps}, {automaton_options, #{ - url => "http://localhost:8022", + url => "http://mg-0:8022", ns => ?NS, retry_strategy => genlib_retry:linear(3, 1) }}, - {event_sink_options, "http://localhost:8022"}, + {event_sink_options, "http://mg-0:8022"}, {processor_pid, ProcessorPid} | C ]. @@ -313,11 +427,12 @@ mg_woody_config(C) -> update_interval => 100 } ], + cluster => ?config(cluster, C), namespaces => #{ ?NS => #{ - storage => ?config(storage, C), + storage => ?config(storage_ns, C), processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, @@ -329,7 +444,9 @@ mg_woody_config(C) -> storage => {exponential, infinity, 1, 10}, timers => {exponential, infinity, 1, 10} }, - scaling => global_based, + scaling => ?config(scaling, C), + registry => ?config(registry, C), + worker => ?config(worker, C), % сейчас существуют проблемы, которые не дают включить на постоянной основе эту % опцию (а очень хочется, чтобы проверять работоспособность идемпотентных ретраев) % TODO в будущем нужно это сделать @@ -348,18 +465,49 @@ mg_woody_config(C) -> ] } }, + %% NS = <<"_event_sinks">> event_sink_ns => #{ - storage => mg_core_storage_memory, - scaling => global_based, + storage => ?config(storage_evs, C), + scaling => ?config(scaling, C), + registry => ?config(registry, C), + worker => ?config(worker, C), default_processing_timeout => 5000 } }. -spec end_per_group(group_name(), config()) -> ok. +end_per_group(distributed_riak, C) -> + _ = mg_cth_cluster:prepare_cluster([<<"mg-0">>]), + ok = lists:foreach( + fun(Host) -> mg_cth_cluster:instance_down(Host) end, + [<<"mg-1">>, <<"mg-2">>] + ), + maybe_drop_buckets(C); end_per_group(_, C) -> - ok = proc_lib:stop(?config(processor_pid, C)), - mg_cth:stop_applications(?config(apps, C)). + maybe_drop_buckets(C), + try + proc_lib:stop(?config(processor_pid, C)), + mg_cth:stop_applications(?config(apps, C)) + catch + _:_ -> + ok + end. +-spec maybe_drop_buckets(_) -> _. +maybe_drop_buckets(_C) -> + {ok, Pid} = riakc_pb_socket:start_link("riakdb", 8087), + {ok, ListBuckets} = riakc_pb_socket:list_buckets(Pid, [{allow_listing, true}]), + lists:foreach( + fun(Bucket) -> + case riakc_pb_socket:list_keys(Pid, Bucket, [{allow_listing, true}]) of + {ok, Keys} -> + lists:foreach(fun(Key) -> ok = riakc_pb_socket:delete(Pid, Bucket, Key) end, Keys); + _ -> + skip + end + end, + ListBuckets + ). %% %% base group tests %% @@ -595,7 +743,22 @@ timeout_call_with_deadline(C) -> DeadlineFn = fun() -> mg_core_deadline:from_timeout(?DEADLINE_TIMEOUT) end, Options0 = no_timeout_automaton_options(C), Options1 = maps:remove(retry_strategy, Options0), - {'EXIT', {{woody_error, {external, result_unknown, <<"{timeout", _/binary>>}}, _Stack}} = + % {'EXIT', {{woody_error, {external, result_unknown, <<"{timeout", _/binary>>}}, _Stack}} = + % { + % 'EXIT', + % { + % { + % woody_error, + % { + % external, + % result_unexpected, + % <<"error:{exception,{woody_error,{internal,result_unknown,<<\"{timeout", _/binary>> + % } + % }, + % _Stack + % } + % } = + {'EXIT', {{woody_error, _}, _Stack}} = (catch mg_cth_automaton_client:call(Options1, ?ID, <<"sleep">>, DeadlineFn())), #mg_stateproc_MachineAlreadyWorking{} = (catch mg_cth_automaton_client:repair(Options0, ?ID, <<"ok">>, DeadlineFn())). @@ -705,7 +868,7 @@ config_with_multiple_event_sinks(_C) -> <<"1">> => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => pool1, max_connections => 100} }, default_processing_timeout => 30000, @@ -715,6 +878,8 @@ config_with_multiple_event_sinks(_C) -> }, retries => #{}, scaling => global_based, + registry => mg_core_procreg_global, + worker => #{registry => mg_core_procreg_global}, event_sinks => [ {mg_core_events_sink_machine, #{name => default, machine_id => <<"SingleES">>}} ] @@ -722,7 +887,7 @@ config_with_multiple_event_sinks(_C) -> <<"2">> => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => pool2, max_connections => 100} }, default_processing_timeout => 5000, @@ -732,6 +897,8 @@ config_with_multiple_event_sinks(_C) -> }, retries => #{}, scaling => global_based, + registry => mg_core_procreg_global, + worker => #{registry => mg_core_procreg_global}, event_sinks => [ {mg_core_events_sink_machine, #{ name => machine, @@ -747,6 +914,8 @@ config_with_multiple_event_sinks(_C) -> }, event_sink_ns => #{ storage => mg_core_storage_memory, + registry => mg_core_procreg_global, + worker => #{registry => mg_core_procreg_global}, scaling => global_based, default_processing_timeout => 5000 } diff --git a/config/config.yaml b/config/config.yaml index 5c3f103..f217ae6 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -46,7 +46,7 @@ worker: message_queue_len_limit: 1000 process_registry: - module: mg_core_procreg_global + module: mg_core_procreg_gproc cluster: discovery: @@ -61,7 +61,7 @@ cluster: reconnect_timeout: 5000 namespaces: - test_transaction: + NS: timers: scan_interval: 60s scan_limit: 2000 @@ -102,7 +102,7 @@ namespaces: topic: test_transaction client: default_kafka_client processor: - url: http://test-transaction2:8088/v1/processor + url: http://mg-0:8023/processor pool_size: 2000 http_keep_alive_timeout: 3000ms unload_timeout: 180s diff --git a/elvis.config b/elvis.config index 895e2d7..f4b5865 100644 --- a/elvis.config +++ b/elvis.config @@ -111,7 +111,12 @@ % We want to use `ct:pal/2` and friends in test code. {elvis_style, no_debug_call, disable}, % Tests are usually more comprehensible when a bit more verbose. - {elvis_style, dont_repeat_yourself, #{min_complexity => 20}}, + {elvis_style, dont_repeat_yourself, #{ + min_complexity => 20, + ignore => [ + mg_woody_tests_SUITE + ] + }}, {elvis_style, god_modules, #{ ignore => [ mg_prometheus_metric_SUITE,