Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/perhap/adapters/eventstore.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Perhap.Adapters.Eventstore do
@callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}
@callback put_event(event: Perhap.Event.t) :: :ok | {:error, term}
@callback get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term}
@callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) ::
@callback get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) ::
{:ok, list(Perhap.Event.t)} | {:error, term}

defmacro __using__(_opts) do
Expand Down
50 changes: 32 additions & 18 deletions lib/perhap/adapters/eventstore/memory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,52 @@ defmodule Perhap.Adapters.Eventstore.Memory do
end )
end

@spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) ::
@spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t, type: atom()]) ::
{:ok, list(Perhap.Event.t)} | {:error, term}
def get_events(context, opts \\ []) do
Agent.get( __MODULE__,
fn %__MODULE__{events: events, index: index} ->
event_ids = case Keyword.has_key?(opts, :entity_id) do
true ->
Map.get(index, {context, opts[:entity_id]}, [])
_ ->
event_ids =
index
|> Enum.filter(fn {{c, _}, _} -> c == context end)
|> Enum.map(fn {_, events} -> events end)
|> List.flatten
end
event_ids2 = case Keyword.has_key?(opts, :after) do
true ->
after_event = time_order(opts[:after])
event_ids |> Enum.filter(fn {ev} -> ev > after_event end)
_ -> event_ids
end
{:ok, Map.take(events, event_ids2) |> Map.values}
filtered_index = index
|> filter_index_by_entity_id(context, Keyword.get(opts, :entity_id))
|> filter_event_ids_after_a_given_event(Keyword.get(opts, :after))
filtered_events = events
|> Map.take(filtered_index)
|> Map.values
|> filter_events_by_type(Keyword.get(opts,:type))
{:ok, filtered_events}
end )
end

defp filter_index_by_entity_id(index, context, nil) do
index
|> Enum.filter(fn {{c, _}, _} -> c == context end)
|> Enum.map(fn {_, events} -> events end)
|> List.flatten
end
defp filter_index_by_entity_id(index, context, entity_id) do
index
|> Map.get({context, entity_id}, [])
end

defp filter_event_ids_after_a_given_event(event_ids, nil), do: event_ids
defp filter_event_ids_after_a_given_event(event_ids, after_event) do
after_event = time_order(after_event)
event_ids |> Enum.filter(fn ev -> ev > after_event end)
end

defp time_order(maybe_uuidv1) do
case Perhap.Event.is_time_order?(maybe_uuidv1) do
true -> maybe_uuidv1
_ -> maybe_uuidv1 |> Perhap.Event.uuid_v1_to_time_order
end
end

def filter_events_by_type(events, nil), do: events
def filter_events_by_type(events, filter_type) do
events
|> Enum.filter(fn %Perhap.Event{metadata: %Perhap.Event.Metadata{type: type}} -> type == filter_type end)
end

def handle_call({:swarm, :begin_handoff}, _from, state) do
{:reply, {:resume, state}, state}
end
Expand Down
26 changes: 26 additions & 0 deletions lib/perhap/adapters/modelstore.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Perhap.Adapters.Modelstore do
@moduledoc false

@callback start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}
@callback put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) ::
:ok | {:error, term}
@callback get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) ::
{:ok, any()} | {:error, term}

defmacro __using__(_opts) do
quote location: :keep do
@behaviour Perhap.Adapters.Modelstore

@spec start_service(term) :: {:ok, pid}
def start_service(name) do
{:ok, pid} = Swarm.register_name({__MODULE__, name}, Supervisor, :start_child, [{:via, :swarm, :perhap}, child_spec(name)])
Swarm.join(:perhap, pid)
{:ok, pid}
end
def start_service() do
start_service(__MODULE__)
end

end
end
end
51 changes: 51 additions & 0 deletions lib/perhap/adapters/modelstore/memory.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Perhap.Adapters.Modelstore.Memory do
use Perhap.Adapters.Modelstore
use Agent

@type model :: any()
@type ledger :: list(Perhap.Event.t)
@type model_instance :: { Perhap.Event.UUIDv1.t, model, ledger }
@type key :: { Perhap.Event.UUIDv4.t | :single, module() }
@type value :: [ versions: list(model_instance), current_events: list(Perhap.Event.t) ]
@type store :: %{ required(key) => value }
@type t :: [ modelstore: store, events: list(Perhap.Event.t), config: %{} ]
defstruct modelstore: %{}, events: [], config: %{}

@spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}
def start_link(_opts) do
Agent.start_link(fn -> %__MODULE__{} end, name: __MODULE__)
end

@spec put_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t, any()) :: :ok | {:error, term}
def put_model({entity_id, service}, version, model) do
Agent.update(__MODULE__,
fn %__MODULE__{modelstore: store, events: events, config: config} ->
store_val = Map.get(store, {entity_id, service}, [versions: %{}, current_events: []])
versions = store_val[:versions]
{_old_model, ledger} = Map.get(versions, version, {:model, []})
store_val = [versions: Map.put(versions, version, {model, ledger}), current_events: store_val[:current_events]]
updated_store = Map.put(store, {entity_id, service}, store_val)
%__MODULE__{modelstore: updated_store, events: events, config: config}
end )
:ok
end

@spec get_model({Perhap.Event.UUIDv4.t | :single, module()}, Perhap.Event.UUIDv1.t | nil) :: {:ok, any()} | {:error, term}
def get_model({entity_id, service}, version \\ nil) do
Agent.get(__MODULE__,
fn %__MODULE__{modelstore: store, events: events, config: config} ->
case Map.get(store, {entity_id, service}) do
nil ->
{:error, "Model not found"}
[versions: versions, current_events: current_events] ->
case version do
nil ->
{:ok, versions}
version ->
{:ok, Enum.filter(versions, fn {ver_id, _model, _ledger} -> version == ver_id end)}
end
end

end )
end
end
10 changes: 6 additions & 4 deletions lib/perhap/domain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Perhap.Domain do

#@spec reduce(service_id: Perhap.Event.UUIDv4.t | module(),
# event: list(Perhap.Event.t) | Perhap.Event.t) :: :ok
def reduce(service_id, event) when not is_list(event), do: reduce(service_id, List.wrap(event))
def reduce(service_id, event) when not is_list(event), do: reduce(service_id, [event])
def reduce(service_id, events) do
GenServer.cast({:via, :swarm, service_id}, {:reduce, events})
end
Expand Down Expand Up @@ -113,8 +113,10 @@ defmodule Perhap.Domain do

defmacro __before_compile__(_env) do
quote do
@type state models: %{required(Perhap.Event.UUIDv1.t) => %__MODULE__{}}, ledger: list(Perhap.Event.t)

def start_link(name) do
GenServer.start_link(__MODULE__, %__MODULE__{}, [name])
GenServer.start_link(__MODULE__, [models: [], ledger: []], [name])
end

def stop(name) do
Expand All @@ -124,8 +126,8 @@ defmodule Perhap.Domain do
def config() do
%{
ttl: @service_ttl,
expire: @events_expire_after,
buffer_size: @buffer_size
ledger_size: @ledger_size,
num_versions: @num_versions
}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/perhap/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Perhap.Event do
# Timestamps and unique integers
@spec timestamp() :: integer
def timestamp(), do: System.system_time(:microseconds)

@spec unique_integer() :: integer
def unique_integer(), do: System.unique_integer([:monotonic])

Expand Down
6 changes: 3 additions & 3 deletions test/perhap/adapters/eventstore/memory_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule PerhapTest.Adapters.Memory do
defmodule PerhapTest.Adapters.Eventstore.Memory do
use PerhapTest.Helper, port: 4499
alias Perhap.Adapters.Eventstore.Memory

Expand All @@ -9,15 +9,15 @@ defmodule PerhapTest.Adapters.Memory do
end

test "get_event" do
random_context = Enum.random([:a, :b, :c, :d, :e])
random_context = Enum.random([:a, :b])
rando = make_random_event(
%Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} )
Memory.put_event(rando)
assert Memory.get_event(rando.event_id) == {:ok, rando}
end

test "get_events with entity_id" do
random_context = Enum.random([:a, :b, :c, :d, :e])
random_context = Enum.random([:c, :d, :e])
random_entity_id = Perhap.Event.get_uuid_v4()
rando1 = make_random_event(
%Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} )
Expand Down
8 changes: 8 additions & 0 deletions test/perhap/adapters/modelstore/memory_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule PerhapTest.Adapters.Modelstore.Memory do
use PerhapTest.Helper, port: 4499
alias Perhap.Adapters.Modelstore.Memory

test "put_model" do
end

end
27 changes: 23 additions & 4 deletions test/perhap/event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,37 @@ defmodule Perhap.EventTest do

test "retrieves events following an event for an entity" do
assert Perhap.Event.retrieve_events(:aa) == {:ok, []}
random_entity = Perhap.Event.get_uuid_v4
random_entity_id = Perhap.Event.get_uuid_v4
random_event_ids = for _ <- 1..5, do: Perhap.Event.get_uuid_v1
random_events = random_event_ids
|> Enum.map(fn(ev) ->
make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity})
make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity_id})
end)
random_events |> Enum.each(fn(event) -> Perhap.Event.save_event(event) end)
[ ev1 | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse
#first_event = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2)
[ _ | [ ev2 | _ ] ] = random_event_ids |> Enum.reverse
{:ok, [left]} = Perhap.Event.retrieve_events(:aa, entity_id: random_entity_id, after: ev2)
[ right | _ ] = random_events |> Enum.reverse
assert left == right
end

test "retrieves events filtered by an event_type" do
assert Perhap.Event.retrieve_events(:bb) == {:ok, []}
random_entity_id = Perhap.Event.get_uuid_v4
rando1 = make_random_event(
%Perhap.Event.Metadata{context: :bb, type: :bb_type_1, entity_id: random_entity_id} )
Perhap.Event.save_event(rando1)
rando2 = make_random_event(
%Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} )
Perhap.Event.save_event(rando2)
rando3 = make_random_event(
%Perhap.Event.Metadata{context: :bb, type: :bb_type_2, entity_id: random_entity_id} )
Perhap.Event.save_event(rando3)
{:ok, [result1]} = Perhap.Event.retrieve_events(:bb, entity_id: random_entity_id, type: :bb_type_1)
{:ok, [result2 | _]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2)
{:ok, [result3]} = Perhap.Event.retrieve_events(:bb, type: :bb_type_2, after: result2.event_id)
assert result1 == rando1
assert result2 == rando2
assert result3 == rando3
end

defp make_v1() do
Expand Down