diff --git a/lib/perhap/adapters/eventstore.ex b/lib/perhap/adapters/eventstore.ex index e8e8801..6d814d2 100644 --- a/lib/perhap/adapters/eventstore.ex +++ b/lib/perhap/adapters/eventstore.ex @@ -4,7 +4,7 @@ defmodule Perhap.Adapters.Eventstore do @callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} @callback put_event(event: Perhap.Event.t) :: :ok | {:error, term} @callback get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} - @callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: + @callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) :: {:ok, list(Perhap.Event.t)} | {:error, term} defmacro __using__(_opts) do diff --git a/lib/perhap/adapters/eventstore/memory.ex b/lib/perhap/adapters/eventstore/memory.ex index e7fc76f..8344da4 100644 --- a/lib/perhap/adapters/eventstore/memory.ex +++ b/lib/perhap/adapters/eventstore/memory.ex @@ -41,31 +41,39 @@ defmodule Perhap.Adapters.Eventstore.Memory do end ) end - @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: + @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) :: {:ok, list(Perhap.Event.t)} | {:error, term} def get_events(context, opts \\ []) do Agent.get( __MODULE__, fn %__MODULE__{events: events, index: index} -> - event_ids = case Keyword.has_key?(opts, :entity_id) do - true -> - Map.get(index, {context, opts[:entity_id]}, []) - _ -> - event_ids = - index - |> Enum.filter(fn {{c, _}, _} -> c == context end) - |> Enum.map(fn {_, events} -> events end) - |> List.flatten - end - event_ids2 = case Keyword.has_key?(opts, :after) do - true -> - after_event = time_order(opts[:after]) - event_ids |> Enum.filter(fn {ev} -> ev > after_event end) - _ -> event_ids - end - {:ok, Map.take(events, event_ids2) |> Map.values} + filtered_index = index + |> filter_index_by_entity_id(context, Keyword.get(opts, :entity_id)) + |> filter_event_ids_after_a_given_event(Keyword.get(opts, :after)) + filtered_events = events + |> Map.take(filtered_index) + |> Map.values + |> filter_events_by_type(Keyword.get(opts,:type)) + {:ok, filtered_events} end ) end + defp filter_index_by_entity_id(index, context, nil) do + index + |> Enum.filter(fn {{c, _}, _} -> c == context end) + |> Enum.map(fn {_, events} -> events end) + |> List.flatten + end + defp filter_index_by_entity_id(index, context, entity_id) do + index + |> Map.get({context, entity_id}, []) + end + + defp filter_event_ids_after_a_given_event(event_ids, nil), do: event_ids + defp filter_event_ids_after_a_given_event(event_ids, after_event) do + after_event = time_order(after_event) + event_ids |> Enum.filter(fn ev -> ev > after_event end) + end + defp time_order(maybe_uuidv1) do case Perhap.Event.is_time_order?(maybe_uuidv1) do true -> maybe_uuidv1 @@ -73,6 +81,12 @@ defmodule Perhap.Adapters.Eventstore.Memory do end end + def filter_events_by_type(events, nil), do: events + def filter_events_by_type(events, filter_type) do + events + |> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == filter_type end) + end + def handle_call({:swarm, :begin_handoff}, _from, state) do {:reply, {:resume, state}, state} end diff --git a/lib/perhap/adapters/modelstore.ex b/lib/perhap/adapters/modelstore.ex new file mode 100644 index 0000000..5fb2cdf --- /dev/null +++ b/lib/perhap/adapters/modelstore.ex @@ -0,0 +1,26 @@ +defmodule Perhap.Adapters.Modelstore do + @moduledoc false + + @callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} + @callback put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: + :ok | {:error, term} + @callback get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: + {:ok, any()} | {:error, term} + + defmacro __using__(_opts) do + quote location: :keep do + @behaviour Perhap.Adapters.Modelstore + + @spec start_service(term) :: {:ok, pid} + def start_service(name) do + {:ok, pid} = Swarm.register_name({__MODULE__, name}, Supervisor, :start_child, [{:via, :swarm, :perhap}, child_spec(name)]) + Swarm.join(:perhap, pid) + {:ok, pid} + end + def start_service() do + start_service(__MODULE__) + end + + end + end +end diff --git a/lib/perhap/adapters/modelstore/memory.ex b/lib/perhap/adapters/modelstore/memory.ex new file mode 100644 index 0000000..9edaf09 --- /dev/null +++ b/lib/perhap/adapters/modelstore/memory.ex @@ -0,0 +1,51 @@ +defmodule Perhap.Adapters.Modelstore.Memory do + use Perhap.Adapters.Modelstore + use Agent + + @type model :: any() + @type ledger :: list(Perhap.Event.t) + @type model_instance :: { Perhap.Event.UUIDv1.t, model, ledger } + @type key :: { Perhap.Event.UUIDv4.t | :single, module() } + @type value :: [ versions: list(model_instance), current_events: list(Perhap.Event.t) ] + @type store :: %{ required(key) => value } + @type t :: [ modelstore: store, events: list(Perhap.Event.t), config: %{} ] + defstruct modelstore: %{}, events: [], config: %{} + + @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} + def start_link(_opts) do + Agent.start_link(fn -> %__MODULE__{} end, name: __MODULE__) + end + + @spec put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: :ok | {:error, term} + def put_model({entity_id, service}, version, model) do + Agent.update(__MODULE__, + fn %__MODULE__{modelstore: store, events: events, config: config} -> + store_val = Map.get(store, {entity_id, service}, [versions: %{}, current_events: []]) + versions = store_val[:versions] + {_old_model, ledger} = Map.get(versions, version, {:model, []}) + store_val = [versions: Map.put(versions, version, {model, ledger}), current_events: store_val[:current_events]] + updated_store = Map.put(store, {entity_id, service}, store_val) + %__MODULE__{modelstore: updated_store, events: events, config: config} + end ) + :ok + end + + @spec get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: {:ok, any()} | {:error, term} + def get_model({entity_id, service}, version \\ nil) do + Agent.get(__MODULE__, + fn %__MODULE__{modelstore: store, events: events, config: config} -> + case Map.get(store, {entity_id, service}) do + nil -> + {:error, "Model not found"} + [versions: versions, current_events: current_events] -> + case version do + nil -> + {:ok, versions} + version -> + {:ok, Enum.filter(versions, fn {ver_id, _model, _ledger} -> version == ver_id end)} + end + end + + end ) + end +end diff --git a/lib/perhap/domain.ex b/lib/perhap/domain.ex index d1489cd..0d1e093 100644 --- a/lib/perhap/domain.ex +++ b/lib/perhap/domain.ex @@ -17,7 +17,7 @@ defmodule Perhap.Domain do #@spec reduce(service_id: Perhap.Event.UUIDv4.t | module(), # event: list(Perhap.Event.t) | Perhap.Event.t) :: :ok - def reduce(service_id, event) when not is_list(event), do: reduce(service_id, List.wrap(event)) + def reduce(service_id, event) when not is_list(event), do: reduce(service_id, [event]) def reduce(service_id, events) do GenServer.cast({:via, :swarm, service_id}, {:reduce, events}) end @@ -113,8 +113,10 @@ defmodule Perhap.Domain do defmacro __before_compile__(_env) do quote do + @type state models: %{required(Perhap.Event.UUIDv1.t) => %__MODULE__{}}, ledger: list(Perhap.Event.t) + def start_link(name) do - GenServer.start_link(__MODULE__, %__MODULE__{}, [name]) + GenServer.start_link(__MODULE__, [models: [], ledger: []], [name]) end def stop(name) do @@ -124,8 +126,8 @@ defmodule Perhap.Domain do def config() do %{ ttl: @service_ttl, - expire: @events_expire_after, - buffer_size: @buffer_size + ledger_size: @ledger_size, + num_versions: @num_versions } end end diff --git a/lib/perhap/event.ex b/lib/perhap/event.ex index 5ef5f41..2671c9c 100644 --- a/lib/perhap/event.ex +++ b/lib/perhap/event.ex @@ -96,7 +96,7 @@ defmodule Perhap.Event do # Timestamps and unique integers @spec timestamp() :: integer def timestamp(), do: System.system_time(:microseconds) - + @spec unique_integer() :: integer def unique_integer(), do: System.unique_integer([:monotonic]) diff --git a/test/perhap/adapters/eventstore/memory_test.exs b/test/perhap/adapters/eventstore/memory_test.exs index 666311e..0713729 100644 --- a/test/perhap/adapters/eventstore/memory_test.exs +++ b/test/perhap/adapters/eventstore/memory_test.exs @@ -1,4 +1,4 @@ -defmodule PerhapTest.Adapters.Memory do +defmodule PerhapTest.Adapters.Eventstore.Memory do use PerhapTest.Helper, port: 4499 alias Perhap.Adapters.Eventstore.Memory @@ -9,7 +9,7 @@ defmodule PerhapTest.Adapters.Memory do end test "get_event" do - random_context = Enum.random([:a, :b, :c, :d, :e]) + random_context = Enum.random([:a, :b]) rando = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) Memory.put_event(rando) @@ -17,7 +17,7 @@ defmodule PerhapTest.Adapters.Memory do end test "get_events with entity_id" do - random_context = Enum.random([:a, :b, :c, :d, :e]) + random_context = Enum.random([:c, :d, :e]) random_entity_id = Perhap.Event.get_uuid_v4() rando1 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) diff --git a/test/perhap/adapters/modelstore/memory_test.exs b/test/perhap/adapters/modelstore/memory_test.exs new file mode 100644 index 0000000..1be6173 --- /dev/null +++ b/test/perhap/adapters/modelstore/memory_test.exs @@ -0,0 +1,8 @@ +defmodule PerhapTest.Adapters.Modelstore.Memory do + use PerhapTest.Helper, port: 4499 + alias Perhap.Adapters.Modelstore.Memory + + test "put_model" do + end + +end diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 1ae874f..1fb5005 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -121,18 +121,37 @@ defmodule Perhap.EventTest do test "retrieves events following an event for an entity" do assert Perhap.Event.retrieve_events(:aa) == {:ok, []} - random_entity = Perhap.Event.get_uuid_v4 + random_entity_id = Perhap.Event.get_uuid_v4 random_event_ids = for _ <- 1..5, do: Perhap.Event.get_uuid_v1 random_events = random_event_ids |> Enum.map(fn(ev) -> - make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity}) + make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity_id}) end) random_events |> Enum.each(fn(event) -> Perhap.Event.save_event(event) end) - [ ev1 | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse - #first_event = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2) + [ _ | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse + {:ok, [left]} = Perhap.Event.retrieve_events(:aa, entity_id: random_entity_id, after: ev2) + [ right | _ ] = random_events |> Enum.reverse + assert left == right end test "retrieves events filtered by an event_type" do + assert Perhap.Event.retrieve_events(:bb) == {:ok, []} + random_entity_id = Perhap.Event.get_uuid_v4 + rando1 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_1, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando1) + rando2 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando2) + rando3 = make_random_event( + %Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} ) + Perhap.Event.save_event(rando3) + {:ok, [result1]} = Perhap.Event.retrieve_events(:bb, entity_id: random_entity_id, type: :bb_type_1) + {:ok, [result2 | _]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2) + {:ok, [result3]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2, after: result2.event_id) + assert result1 == rando1 + assert result2 == rando2 + assert result3 == rando3 end defp make_v1() do