From fc62bd85c9253cf56927da9fdc0d25643834cd96 Mon Sep 17 00:00:00 2001 From: Rob Martin Date: Mon, 25 Sep 2017 18:57:38 -0600 Subject: [PATCH 1/6] Retrieve events filtered by type, or after another event --- lib/perhap/adapters/eventstore/memory.ex | 22 ++++++++++++------- lib/perhap/event.ex | 2 +- test/perhap/event_test.exs | 27 ++++++++++++++++++++---- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/lib/perhap/adapters/eventstore/memory.ex b/lib/perhap/adapters/eventstore/memory.ex index e7fc76f..db97957 100644 --- a/lib/perhap/adapters/eventstore/memory.ex +++ b/lib/perhap/adapters/eventstore/memory.ex @@ -41,7 +41,7 @@ defmodule Perhap.Adapters.Eventstore.Memory do end ) end - @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: + @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) :: {:ok, list(Perhap.Event.t)} | {:error, term} def get_events(context, opts \\ []) do Agent.get( __MODULE__, @@ -50,19 +50,25 @@ defmodule Perhap.Adapters.Eventstore.Memory do true -> Map.get(index, {context, opts[:entity_id]}, []) _ -> - event_ids = - index - |> Enum.filter(fn {{c, _}, _} -> c == context end) - |> Enum.map(fn {_, events} -> events end) - |> List.flatten + index + |> Enum.filter(fn {{c, _}, _} -> c == context end) + |> Enum.map(fn {_, events} -> events end) + |> List.flatten end event_ids2 = case Keyword.has_key?(opts, :after) do true -> after_event = time_order(opts[:after]) - event_ids |> Enum.filter(fn {ev} -> ev > after_event end) + event_ids |> Enum.filter(fn ev -> ev > after_event end) _ -> event_ids end - {:ok, Map.take(events, event_ids2) |> Map.values} + events = Map.take(events, event_ids2) |> Map.values + events2 = case Keyword.has_key?(opts, :type) do + true -> + events |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == opts[:type] end) + _ -> + events + end + {:ok, events2} end ) end diff --git a/lib/perhap/event.ex b/lib/perhap/event.ex index 5ef5f41..2671c9c 100644 --- a/lib/perhap/event.ex +++ b/lib/perhap/event.ex @@ -96,7 +96,7 @@ defmodule Perhap.Event do # Timestamps and unique integers @spec timestamp() :: integer def timestamp(), do: System.system_time(:microseconds) - + @spec unique_integer() :: integer def unique_integer(), do: System.unique_integer([:monotonic]) diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 1ae874f..1fb5005 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -121,18 +121,37 @@ defmodule Perhap.EventTest do test "retrieves events following an event for an entity" do assert Perhap.Event.retrieve_events(:aa) == {:ok, []} - random_entity = Perhap.Event.get_uuid_v4 + random_entity_id = Perhap.Event.get_uuid_v4 random_event_ids = for _ <- 1..5, do: Perhap.Event.get_uuid_v1 random_events = random_event_ids |> Enum.map(fn(ev) -> - make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity}) + make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity_id}) end) random_events |> Enum.each(fn(event) -> Perhap.Event.save_event(event) end) - [ ev1 | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse - #first_event = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2) + [ _ | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse + {:ok, [left]} = Perhap.Event.retrieve_events(:aa, entity_id: random_entity_id, after: ev2) + [ right | _ ] = random_events |> Enum.reverse + assert left == right end test "retrieves events filtered by an event_type" do + assert Perhap.Event.retrieve_events(:bb) == {:ok, []} + random_entity_id = Perhap.Event.get_uuid_v4 + rando1 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_1, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando1) + rando2 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando2) + rando3 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando3) + {:ok, [result1]} = Perhap.Event.retrieve_events(:bb, entity_id: random_entity_id, type: :bb_type_1) + {:ok, [result2 | _]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2) + {:ok, [result3]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2, after: result2.event_id) + assert result1 == rando1 + assert result2 == rando2 + assert result3 == rando3 end defp make_v1() do From 63b48fd438dfeb33d9501afeae6a772bd182d42a Mon Sep 17 00:00:00 2001 From: Rob Martin Date: Mon, 25 Sep 2017 19:17:43 -0600 Subject: [PATCH 2/6] refactor unwieldy function --- lib/perhap/adapters/eventstore/memory.ex | 63 ++++++++++++------- .../adapters/eventstore/memory_test.exs | 4 +- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/lib/perhap/adapters/eventstore/memory.ex b/lib/perhap/adapters/eventstore/memory.ex index db97957..b21a13a 100644 --- a/lib/perhap/adapters/eventstore/memory.ex +++ b/lib/perhap/adapters/eventstore/memory.ex @@ -46,32 +46,39 @@ defmodule Perhap.Adapters.Eventstore.Memory do def get_events(context, opts \\ []) do Agent.get( __MODULE__, fn %__MODULE__{events: events, index: index} -> - event_ids = case Keyword.has_key?(opts, :entity_id) do - true -> - Map.get(index, {context, opts[:entity_id]}, []) - _ -> - index - |> Enum.filter(fn {{c, _}, _} -> c == context end) - |> Enum.map(fn {_, events} -> events end) - |> List.flatten - end - event_ids2 = case Keyword.has_key?(opts, :after) do - true -> - after_event = time_order(opts[:after]) - event_ids |> Enum.filter(fn ev -> ev > after_event end) - _ -> event_ids - end - events = Map.take(events, event_ids2) |> Map.values - events2 = case Keyword.has_key?(opts, :type) do - true -> - events |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == opts[:type] end) - _ -> - events - end - {:ok, events2} + filtered_index = index + |> filter_index_by_entity_id(context, opts) + |> filter_event_ids_after_a_given_event(opts) + filtered_events = events + |> Map.take(filtered_index) + |> Map.values + |> filter_events_by_type(opts) + {:ok, filtered_events} end ) end + defp filter_index_by_entity_id(index, context, opts) do + case Keyword.has_key?(opts, :entity_id) do + true -> + index + |> Map.get({context, opts[:entity_id]}, []) + _ -> + index + |> Enum.filter(fn {{c, _}, _} -> c == context end) + |> Enum.map(fn {_, events} -> events end) + |> List.flatten + end + end + + defp filter_event_ids_after_a_given_event(event_ids, opts) do + case Keyword.has_key?(opts, :after) do + true -> + after_event = time_order(opts[:after]) + event_ids |> Enum.filter(fn ev -> ev > after_event end) + _ -> event_ids + end + end + defp time_order(maybe_uuidv1) do case Perhap.Event.is_time_order?(maybe_uuidv1) do true -> maybe_uuidv1 @@ -79,6 +86,16 @@ defmodule Perhap.Adapters.Eventstore.Memory do end end + def filter_events_by_type(events, opts) do + case Keyword.has_key?(opts, :type) do + true -> + events + |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == opts[:type] end) + _ -> + events + end + end + def handle_call({:swarm, :begin_handoff}, _from, state) do {:reply, {:resume, state}, state} end diff --git a/test/perhap/adapters/eventstore/memory_test.exs b/test/perhap/adapters/eventstore/memory_test.exs index 666311e..3ae8a75 100644 --- a/test/perhap/adapters/eventstore/memory_test.exs +++ b/test/perhap/adapters/eventstore/memory_test.exs @@ -9,7 +9,7 @@ defmodule PerhapTest.Adapters.Memory do end test "get_event" do - random_context = Enum.random([:a, :b, :c, :d, :e]) + random_context = Enum.random([:a, :b]) rando = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) Memory.put_event(rando) @@ -17,7 +17,7 @@ defmodule PerhapTest.Adapters.Memory do end test "get_events with entity_id" do - random_context = Enum.random([:a, :b, :c, :d, :e]) + random_context = Enum.random([:c, :d, :e]) random_entity_id = Perhap.Event.get_uuid_v4() rando1 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) From 25f98316e0b307b2423d5b45664c1f25232b1bf0 Mon Sep 17 00:00:00 2001 From: Rob Martin Date: Mon, 25 Sep 2017 19:30:35 -0600 Subject: [PATCH 3/6] further refactor for better looking code --- lib/perhap/adapters/eventstore/memory.ex | 49 ++++++++++-------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/lib/perhap/adapters/eventstore/memory.ex b/lib/perhap/adapters/eventstore/memory.ex index b21a13a..8344da4 100644 --- a/lib/perhap/adapters/eventstore/memory.ex +++ b/lib/perhap/adapters/eventstore/memory.ex @@ -47,36 +47,31 @@ defmodule Perhap.Adapters.Eventstore.Memory do Agent.get( __MODULE__, fn %__MODULE__{events: events, index: index} -> filtered_index = index - |> filter_index_by_entity_id(context, opts) - |> filter_event_ids_after_a_given_event(opts) + |> filter_index_by_entity_id(context, Keyword.get(opts, :entity_id)) + |> filter_event_ids_after_a_given_event(Keyword.get(opts, :after)) filtered_events = events |> Map.take(filtered_index) |> Map.values - |> filter_events_by_type(opts) + |> filter_events_by_type(Keyword.get(opts,:type)) {:ok, filtered_events} end ) end - defp filter_index_by_entity_id(index, context, opts) do - case Keyword.has_key?(opts, :entity_id) do - true -> - index - |> Map.get({context, opts[:entity_id]}, []) - _ -> - index - |> Enum.filter(fn {{c, _}, _} -> c == context end) - |> Enum.map(fn {_, events} -> events end) - |> List.flatten - end + defp filter_index_by_entity_id(index, context, nil) do + index + |> Enum.filter(fn {{c, _}, _} -> c == context end) + |> Enum.map(fn {_, events} -> events end) + |> List.flatten + end + defp filter_index_by_entity_id(index, context, entity_id) do + index + |> Map.get({context, entity_id}, []) end - defp filter_event_ids_after_a_given_event(event_ids, opts) do - case Keyword.has_key?(opts, :after) do - true -> - after_event = time_order(opts[:after]) - event_ids |> Enum.filter(fn ev -> ev > after_event end) - _ -> event_ids - end + defp filter_event_ids_after_a_given_event(event_ids, nil), do: event_ids + defp filter_event_ids_after_a_given_event(event_ids, after_event) do + after_event = time_order(after_event) + event_ids |> Enum.filter(fn ev -> ev > after_event end) end defp time_order(maybe_uuidv1) do @@ -86,14 +81,10 @@ defmodule Perhap.Adapters.Eventstore.Memory do end end - def filter_events_by_type(events, opts) do - case Keyword.has_key?(opts, :type) do - true -> - events - |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == opts[:type] end) - _ -> - events - end + def filter_events_by_type(events, nil), do: events + def filter_events_by_type(events, filter_type) do + events + |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == filter_type end) end def handle_call({:swarm, :begin_handoff}, _from, state) do From f814fcc6d21d5a9988956b00980d86af62bc0a12 Mon Sep 17 00:00:00 2001 From: Rob Martin Date: Thu, 12 Oct 2017 12:30:40 -0600 Subject: [PATCH 4/6] Incomplete and broken, reference only --- lib/perhap/adapters/eventstore.ex | 2 +- lib/perhap/adapters/modelstore.ex | 26 +++++++++++++++++++ lib/perhap/adapters/modelstore/memory.ex | 26 +++++++++++++++++++ lib/perhap/domain.ex | 10 ++++--- .../adapters/eventstore/memory_test.exs | 2 +- .../adapters/modelstore/memory_test.exs | 8 ++++++ 6 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 lib/perhap/adapters/modelstore.ex create mode 100644 lib/perhap/adapters/modelstore/memory.ex create mode 100644 test/perhap/adapters/modelstore/memory_test.exs diff --git a/lib/perhap/adapters/eventstore.ex b/lib/perhap/adapters/eventstore.ex index e8e8801..6d814d2 100644 --- a/lib/perhap/adapters/eventstore.ex +++ b/lib/perhap/adapters/eventstore.ex @@ -4,7 +4,7 @@ defmodule Perhap.Adapters.Eventstore do @callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} @callback put_event(event: Perhap.Event.t) :: :ok | {:error, term} @callback get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} - @callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: + @callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) :: {:ok, list(Perhap.Event.t)} | {:error, term} defmacro __using__(_opts) do diff --git a/lib/perhap/adapters/modelstore.ex b/lib/perhap/adapters/modelstore.ex new file mode 100644 index 0000000..5fb2cdf --- /dev/null +++ b/lib/perhap/adapters/modelstore.ex @@ -0,0 +1,26 @@ +defmodule Perhap.Adapters.Modelstore do + @moduledoc false + + @callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} + @callback put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: + :ok | {:error, term} + @callback get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: + {:ok, any()} | {:error, term} + + defmacro __using__(_opts) do + quote location: :keep do + @behaviour Perhap.Adapters.Modelstore + + @spec start_service(term) :: {:ok, pid} + def start_service(name) do + {:ok, pid} = Swarm.register_name({__MODULE__, name}, Supervisor, :start_child, [{:via, :swarm, :perhap}, child_spec(name)]) + Swarm.join(:perhap, pid) + {:ok, pid} + end + def start_service() do + start_service(__MODULE__) + end + + end + end +end diff --git a/lib/perhap/adapters/modelstore/memory.ex b/lib/perhap/adapters/modelstore/memory.ex new file mode 100644 index 0000000..d908ed1 --- /dev/null +++ b/lib/perhap/adapters/modelstore/memory.ex @@ -0,0 +1,26 @@ +defmodule Perhap.Adapters.Modelstore.Memory do + use Perhap.Adapters.Modelstore + use Agent + + @type model :: any() + @type ledger :: list(Perhap.Event.t) + @type model_instance :: { Perhap.Event.UUIDv1.t, model, ledger } + @type key :: { Perhap.Event.UUIDv4.t | :single, module() } + @type value :: [ versions: list(model_instance), current_events: list(Perhap.Event.t) ] + @type store :: %{ required(key) => value } + @type t :: [ modelstore: modelstore, + defstruct modelstore: [], events: [], config: %{} + + @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} + def start_link(_opts) do + Agent.start_link(fn -> %__MODULE__{} end, name: __MODULE__) + end + + @spec put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: :ok | {:error, term} + def put_model({entity_id, service}, version, model) do + end + + @spec get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: {:ok, any()} | {:error, term} + def get_model({entity_id, service}, version \\ nil) do + end +end diff --git a/lib/perhap/domain.ex b/lib/perhap/domain.ex index d1489cd..0d1e093 100644 --- a/lib/perhap/domain.ex +++ b/lib/perhap/domain.ex @@ -17,7 +17,7 @@ defmodule Perhap.Domain do #@spec reduce(service_id: Perhap.Event.UUIDv4.t | module(), # event: list(Perhap.Event.t) | Perhap.Event.t) :: :ok - def reduce(service_id, event) when not is_list(event), do: reduce(service_id, List.wrap(event)) + def reduce(service_id, event) when not is_list(event), do: reduce(service_id, [event]) def reduce(service_id, events) do GenServer.cast({:via, :swarm, service_id}, {:reduce, events}) end @@ -113,8 +113,10 @@ defmodule Perhap.Domain do defmacro __before_compile__(_env) do quote do + @type state models: %{required(Perhap.Event.UUIDv1.t) => %__MODULE__{}}, ledger: list(Perhap.Event.t) + def start_link(name) do - GenServer.start_link(__MODULE__, %__MODULE__{}, [name]) + GenServer.start_link(__MODULE__, [models: [], ledger: []], [name]) end def stop(name) do @@ -124,8 +126,8 @@ defmodule Perhap.Domain do def config() do %{ ttl: @service_ttl, - expire: @events_expire_after, - buffer_size: @buffer_size + ledger_size: @ledger_size, + num_versions: @num_versions } end end diff --git a/test/perhap/adapters/eventstore/memory_test.exs b/test/perhap/adapters/eventstore/memory_test.exs index 3ae8a75..0713729 100644 --- a/test/perhap/adapters/eventstore/memory_test.exs +++ b/test/perhap/adapters/eventstore/memory_test.exs @@ -1,4 +1,4 @@ -defmodule PerhapTest.Adapters.Memory do +defmodule PerhapTest.Adapters.Eventstore.Memory do use PerhapTest.Helper, port: 4499 alias Perhap.Adapters.Eventstore.Memory diff --git a/test/perhap/adapters/modelstore/memory_test.exs b/test/perhap/adapters/modelstore/memory_test.exs new file mode 100644 index 0000000..1be6173 --- /dev/null +++ b/test/perhap/adapters/modelstore/memory_test.exs @@ -0,0 +1,8 @@ +defmodule PerhapTest.Adapters.Modelstore.Memory do + use PerhapTest.Helper, port: 4499 + alias Perhap.Adapters.Modelstore.Memory + + test "put_model" do + end + +end From 920d562af5a46eed1a707a48e4be662014a52d02 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Fri, 3 Nov 2017 03:21:39 -0300 Subject: [PATCH 5/6] first stab at modelstore, untested WIP --- lib/perhap/adapters/modelstore/memory.ex | 27 ++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/perhap/adapters/modelstore/memory.ex b/lib/perhap/adapters/modelstore/memory.ex index d908ed1..ae61a7c 100644 --- a/lib/perhap/adapters/modelstore/memory.ex +++ b/lib/perhap/adapters/modelstore/memory.ex @@ -8,8 +8,8 @@ defmodule Perhap.Adapters.Modelstore.Memory do @type key :: { Perhap.Event.UUIDv4.t | :single, module() } @type value :: [ versions: list(model_instance), current_events: list(Perhap.Event.t) ] @type store :: %{ required(key) => value } - @type t :: [ modelstore: modelstore, - defstruct modelstore: [], events: [], config: %{} + @type t :: [ modelstore: store, events: list(Perhap.Event.t), config: %{} ] + defstruct modelstore: %{}, events: [], config: %{} @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(_opts) do @@ -18,9 +18,32 @@ defmodule Perhap.Adapters.Modelstore.Memory do @spec put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: :ok | {:error, term} def put_model({entity_id, service}, version, model) do + Agent.update(__MODULE__, + fn %__MODULE__{modelstore: store, events: events, config: config} -> + store_val = Map.get(store, {entity_id, service}, [versions: [], current_events: []]) + store_val = [versions: Enum.map(store_val[:versions], fn {^version, old_model, ledger} -> {^version, model, ledger}, {not_version, old_model, ledger} -> {not_version, old_model, ledger} end), current_events: store_val[:current_events]] + updated_store = Map.put(store, {entity_id, service}, store_val) + %__MODULE__{modelstore: updated_store, events: events, config: config} + end ) + :ok end @spec get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: {:ok, any()} | {:error, term} def get_model({entity_id, service}, version \\ nil) do + Agent.get(__MODULE__, + fn %__MODULE__{modelstore: store, events: events, config: config} -> + case Map.get(store, {entity_id, service}) do + nil -> + {:error, "Model not found"} + [versions: versions, current_events: current_events] -> + case version do + nil -> + {:ok, versions} + version -> + {:ok, Enum.filter(versions, fn {ver_id, _model, _ledger} -> version == ver_id end)} + end + end + + end ) end end From 12d26f4cc768633f43890af8dbd4de8a69e4bf93 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Fri, 3 Nov 2017 11:12:42 -0300 Subject: [PATCH 6/6] more WIP, currently erroring typespec violation --- lib/perhap/adapters/modelstore/memory.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/perhap/adapters/modelstore/memory.ex b/lib/perhap/adapters/modelstore/memory.ex index ae61a7c..9edaf09 100644 --- a/lib/perhap/adapters/modelstore/memory.ex +++ b/lib/perhap/adapters/modelstore/memory.ex @@ -20,8 +20,10 @@ defmodule Perhap.Adapters.Modelstore.Memory do def put_model({entity_id, service}, version, model) do Agent.update(__MODULE__, fn %__MODULE__{modelstore: store, events: events, config: config} -> - store_val = Map.get(store, {entity_id, service}, [versions: [], current_events: []]) - store_val = [versions: Enum.map(store_val[:versions], fn {^version, old_model, ledger} -> {^version, model, ledger}, {not_version, old_model, ledger} -> {not_version, old_model, ledger} end), current_events: store_val[:current_events]] + store_val = Map.get(store, {entity_id, service}, [versions: %{}, current_events: []]) + versions = store_val[:versions] + {_old_model, ledger} = Map.get(versions, version, {:model, []}) + store_val = [versions: Map.put(versions, version, {model, ledger}), current_events: store_val[:current_events]] updated_store = Map.put(store, {entity_id, service}, store_val) %__MODULE__{modelstore: updated_store, events: events, config: config} end )