From a63dbfcf5f89d35d9bfe5fee02e96d9b4ff5e58d Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Thu, 2 Nov 2017 17:31:29 -0300 Subject: [PATCH 01/10] tests working again --- lib/perhap/adapters/eventstore/dynamo.ex | 90 ++++++++++++++++++- lib/perhap_dynamo.ex | 15 ---- .../adapters/eventstore/dynamo_test.exs | 20 ++++- test/perhap/event_test.exs | 72 ++++++++++----- 4 files changed, 157 insertions(+), 40 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 80a97a2..33becab 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -13,10 +13,36 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(args) do - {:ok, pid} = GenServer.start_link(__MODULE__, args) + GenServer.start_link(__MODULE__, [], name: :eventstore) + end + + def init(args) do + interval = 1 #miliseconds + Process.send_after(self(), {:batch_write, interval}, interval) + {:ok, []} end @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} + def put_event(event) do + GenServer.call(:eventstore, {:put_event, event}) + end + + def handle_call({:put_event, event}, _from, events) do + {:reply, :ok, [event | events]} + end + + def handle_info({:batch_write, interval}, []) do + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, []} + end + + def handle_info({:batch_write, interval}, events) do + Task.start(__MODULE__, :put_to_dynamo, [Enum.reverse(events)]) + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, []} + end + + @doc """ def put_event(event) do event = %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order} ExAws.Dynamo.put_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{Map.from_struct(event) | metadata: Map.from_struct(event.metadata)}) @@ -37,6 +63,8 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do :ok end + """ + @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} def get_event(event_id) do @@ -130,4 +158,64 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do Enum.reduce(data, %{}, fn({key, value}, map) -> Map.put(map, String.to_atom(key), value) end) end + + def put_to_dynamo(events) do + events = events |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, metadata: Map.from_struct(event.metadata)} |> Map.from_struct end) + + Enum.chunk_every(events, 25) |> batch_put() + end + + defp batch_put([]) do + :ok + end + + defp batch_put([chunk | rest]) do + index_keys = chunk + |> Enum.map(fn event -> %{context: event.metadata.context, entity_id: event.metadata.entity_id} end) + |> Enum.dedup + + index = ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => [keys: index_keys]}) + |> ExAws.request! + |> Map.get("Responses") + |> Map.get("Index") + |> Enum.map(fn index_item -> ExAws.Dynamo.Decoder.decode(index_item) end) + |> Enum.map(fn index_item -> %{index_item | "context" => String.to_atom(index_item["context"])} end) + |> Enum.reduce(%{}, fn (index, map) -> Map.put(map, {index["context"], index["entity_id"]}, index["events"]) end) + #unprocessed keys + + index_put_request = process_index(chunk, index) + + event_put_request = chunk |> Enum.map(fn event -> [put_request: [item: event]] end) + + case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_table_name, "Events") => event_put_request}) |> ExAws.request do + {:error, reason} -> + IO.puts "Error writing events to dynamo, reason: #{inspect reason}" + IO.inspect chunk + _ -> + case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => index_put_request}) |> ExAws.request do + {:error, reason} -> + IO.puts "Error writing index to dynamo, reason: #{inspect reason}" + IO.inspect {:events, chunk} + IO.inspect {:index, index_put_request} + + Enum.each(chunk, fn event -> ExAws.Dynamo.delete_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + _ -> + :ok + end + end + batch_put(rest) + end + + defp process_index([], index) do + index + |> Map.keys + |> Enum.map(fn {context, entity_id} -> [put_request: [item: %{context: context, entity_id: entity_id, events: Map.get(index, {context, entity_id})}]] end) + end + + defp process_index([event | rest], index) do + index_key = {event.metadata.context, event.metadata.entity_id} + indexed_events = [event.event_id | Map.get(index, index_key, [])] + process_index(rest, Map.put(index, index_key, indexed_events)) + end + end diff --git a/lib/perhap_dynamo.ex b/lib/perhap_dynamo.ex index 29f5eea..29bf76a 100644 --- a/lib/perhap_dynamo.ex +++ b/lib/perhap_dynamo.ex @@ -1,18 +1,3 @@ defmodule PerhapDynamo do - @moduledoc """ - Documentation for PerhapDynamo. - """ - @doc """ - Hello world. - - ## Examples - - iex> PerhapDynamo.hello - :world - - """ - def hello do - :world - end end diff --git a/test/perhap/adapters/eventstore/dynamo_test.exs b/test/perhap/adapters/eventstore/dynamo_test.exs index 15f0ce4..11a411d 100644 --- a/test/perhap/adapters/eventstore/dynamo_test.exs +++ b/test/perhap/adapters/eventstore/dynamo_test.exs @@ -2,8 +2,12 @@ defmodule PerhapTest.Adapters.Dynamo do use PerhapTest.Helper, port: 4499 alias Perhap.Adapters.Eventstore.Dynamo + @pause_interval 500 + setup do Application.put_env(:perhap, :eventstore, Perhap.Adapters.Eventstore.Dynamo, []) + Perhap.Adapters.Eventstore.Dynamo.start_link([]) + :ok end @@ -11,26 +15,34 @@ defmodule PerhapTest.Adapters.Dynamo do random_context = Enum.random([:a, :b, :c, :d, :e]) random_event = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) - assert ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! == %{} + check1 = ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! Dynamo.put_event(random_event) - refute ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! == %{} + :timer.sleep(@pause_interval) + check2 = ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + + assert check1 == %{} + refute check2 == %{} end test "get_event" do random_context = Enum.random([:a, :b, :c, :d, :e]) random_event = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) - assert Dynamo.get_event(random_event.event_id) == {:error, "Event not found"} + check1 = Dynamo.get_event(random_event.event_id) Dynamo.put_event(random_event) - assert Dynamo.get_event(random_event.event_id) == {:ok, random_event} + :timer.sleep(@pause_interval) + check2 = Dynamo.get_event(random_event.event_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + + assert check1== {:error, "Event not found"} + assert check2 == {:ok, random_event} end test "get_events with entity_id" do diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 7ef5284..6854d6d 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -1,7 +1,15 @@ defmodule Perhap.EventTestDynamo do - use ExUnit.Case, async: true + use ExUnit.Case, async: false import PerhapTest.Helper, only: :functions + @interval 1 + @pause_interval 500 + + setup do + Perhap.Adapters.Eventstore.Dynamo.start_link([]) + :ok + end + test "timestamp returns system time in microseconds" do assert_in_delta(Perhap.Event.timestamp(), :erlang.system_time(:microsecond), 10) end @@ -66,11 +74,16 @@ defmodule Perhap.EventTestDynamo do test "saves and retrieves an event" do random_event = PerhapTest.Helper.make_random_event() - assert Perhap.Event.save_event(random_event) == {:ok, random_event} + + response = Perhap.Event.save_event(random_event) + :timer.sleep(@pause_interval) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + + assert response == {:ok, random_event} end test "doesn't retrieve an event that doesn't exist" do @@ -82,12 +95,15 @@ defmodule Perhap.EventTestDynamo do random_event = make_random_event( %Perhap.Event.Metadata{context: random_context} ) Perhap.Event.save_event(random_event) - {:ok, result} = Perhap.Event.retrieve_event(random_event.event_id) - assert result.event_id == random_event.event_id + :timer.sleep(@pause_interval) + result = Perhap.Event.retrieve_event(random_event.event_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + + assert result == {:ok, random_event} end test "retrieves events by context and entity ID" do @@ -99,14 +115,18 @@ defmodule Perhap.EventTestDynamo do rando2 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando2) - {:ok, results} = Perhap.Event.retrieve_events(random_context, entity_id: random_entity_id) - assert Enum.member?(results, rando1) - assert Enum.member?(results, rando2) + :timer.sleep(@pause_interval) + results = Perhap.Event.retrieve_events(random_context, entity_id: random_entity_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando1.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Events", %{event_id: rando2.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + + {:ok, events} = results + assert Enum.member?(events, rando1) + assert Enum.member?(events, rando2) end @@ -116,17 +136,22 @@ defmodule Perhap.EventTestDynamo do rando1 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando1) + :timer.sleep(@pause_interval) rando2 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando2) - {:ok, results} = Perhap.Event.retrieve_events(random_context) - assert Enum.member?(results, rando1) - assert Enum.member?(results, rando2) + :timer.sleep(@pause_interval) + results = Perhap.Event.retrieve_events(random_context) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando1.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Events", %{event_id: rando2.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + + {:ok, events} = results + assert Enum.member?(events, rando1) + assert Enum.member?(events, rando2) end test "returns an empty list if events don't exist" do @@ -135,15 +160,21 @@ defmodule Perhap.EventTestDynamo do rando = make_random_event( %Perhap.Event.Metadata{context: :z, entity_id: random_entity_id} ) Perhap.Event.save_event(rando) - assert Perhap.Event.retrieve_events(:z, entity_id: Perhap.Event.get_uuid_v4) == {:ok, []} - {:ok, [result1]} = Perhap.Event.retrieve_events(:z) - assert result1.event_id == rando.event_id - {:ok, [result2]} = Perhap.Event.retrieve_events(:z, entity_id: random_entity_id) - assert result2.event_id == rando.event_id + :timer.sleep(@pause_interval) + check1 = Perhap.Event.retrieve_events(:z, entity_id: Perhap.Event.get_uuid_v4) + result1 = Perhap.Event.retrieve_events(:z) + result2 = Perhap.Event.retrieve_events(:z, entity_id: random_entity_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: rando.metadata.context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + {:ok, [event1]} = result1 + {:ok, [event2]} = result2 + + assert check1 == {:ok, []} + assert event1.event_id == rando.event_id + assert event2.event_id == rando.event_id end @@ -157,17 +188,18 @@ defmodule Perhap.EventTestDynamo do make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity}) end) random_events |> Enum.each(fn(event) -> Perhap.Event.save_event(event) end) + :timer.sleep(@pause_interval) [ ev1 | [ ev2 | rest ] ] = random_events |> Enum.reverse - {:ok, results} = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2.event_id) + results = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2.event_id) #cleanup Enum.map(random_events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) ExAws.Dynamo.delete_item("Index", %{context: :aa, entity_id: random_entity}) |> ExAws.request! #/cleanup - #IO.inspect random_events - #IO.inspect results - assert Enum.member?(results, ev1) - Enum.map(rest, fn event -> refute Enum.member?(results, event) end) + + {:ok, events} = results + assert Enum.member?(events, ev1) + Enum.map(rest, fn event -> refute Enum.member?(events, event) end) From b0e1637516d6114ee0623d37f838ca839f27e6bd Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Wed, 8 Nov 2017 14:56:48 -0400 Subject: [PATCH 02/10] WIP stuff --- lib/perhap/adapters/eventstore/dynamo.ex | 91 +++++++++++++++--------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 33becab..c98e12c 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -17,9 +17,9 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end def init(args) do - interval = 1 #miliseconds + interval = 100 #miliseconds Process.send_after(self(), {:batch_write, interval}, interval) - {:ok, []} + {:ok, %{pending: [], posting: []}} end @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} @@ -28,18 +28,25 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end def handle_call({:put_event, event}, _from, events) do - {:reply, :ok, [event | events]} + {:reply, :ok, %{events | pending: [event | events.pending]}} end - def handle_info({:batch_write, interval}, []) do + def handle_info({:batch_write, interval}, events = %{pending: []}) do Process.send_after(self(), {:batch_write, interval}, interval) - {:noreply, []} + {:noreply, events} end def handle_info({:batch_write, interval}, events) do - Task.start(__MODULE__, :put_to_dynamo, [Enum.reverse(events)]) + chunked = events.pending + |> Enum.chunk_every(25) + |> Enum.map(fn chunk -> with {:ok, pid} <- Task.start(__MODULE__, :put_to_dynamo, [chunk]) + do {pid, chunk} + else err -> raise err + end + end) + Process.send_after(self(), {:batch_write, interval}, interval) - {:noreply, []} + {:noreply, %{pending: [], posting: [chunked | events.posting]}} end @doc """ @@ -160,21 +167,52 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end def put_to_dynamo(events) do - events = events |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, metadata: Map.from_struct(event.metadata)} |> Map.from_struct end) + events = events + |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, + metadata: Map.from_struct(event.metadata)} + |> Map.from_struct end) - Enum.chunk_every(events, 25) |> batch_put() + batch_put(events) end defp batch_put([]) do :ok end - defp batch_put([chunk | rest]) do - index_keys = chunk - |> Enum.map(fn event -> %{context: event.metadata.context, entity_id: event.metadata.entity_id} end) - |> Enum.dedup + defp batch_put(events) do + index_keys = make_index_keys(events) + + index = retrieve_index(index_keys) + + index_put_request = process_index(events, index) + + event_put_request = events + |> Enum.map(fn event -> [put_request: [item: event]] end) + + do_write_events(events, event_put_request, index_put_request) + + end + + defp process_index([], index) do + index + |> Map.keys + |> Enum.map(fn {context, entity_id} -> [put_request: [item: %{context: context, entity_id: entity_id, events: Map.get(index, {context, entity_id})}]] end) + end + + defp process_index([event | rest], index) do + index_key = {event.metadata.context, event.metadata.entity_id} + indexed_events = [event.event_id | Map.get(index, index_key, [])] + process_index(rest, Map.put(index, index_key, indexed_events)) + end - index = ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => [keys: index_keys]}) + defp make_index_keys(events) do + events + |> Enum.map(fn event -> %{context: event.metadata.context, entity_id: event.metadata.entity_id} end) + |> Enum.dedup + end + + defp retrieve_index(index_keys) do + ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => [keys: index_keys]}) |> ExAws.request! |> Map.get("Responses") |> Map.get("Index") @@ -182,40 +220,25 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do |> Enum.map(fn index_item -> %{index_item | "context" => String.to_atom(index_item["context"])} end) |> Enum.reduce(%{}, fn (index, map) -> Map.put(map, {index["context"], index["entity_id"]}, index["events"]) end) #unprocessed keys + end - index_put_request = process_index(chunk, index) - - event_put_request = chunk |> Enum.map(fn event -> [put_request: [item: event]] end) - + defp do_write_events(events, event_put_request, index_put_request) do case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_table_name, "Events") => event_put_request}) |> ExAws.request do {:error, reason} -> IO.puts "Error writing events to dynamo, reason: #{inspect reason}" - IO.inspect chunk + IO.inspect events _ -> case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => index_put_request}) |> ExAws.request do {:error, reason} -> IO.puts "Error writing index to dynamo, reason: #{inspect reason}" - IO.inspect {:events, chunk} + IO.inspect {:events, events} IO.inspect {:index, index_put_request} - Enum.each(chunk, fn event -> ExAws.Dynamo.delete_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + Enum.each(events, fn event -> ExAws.Dynamo.delete_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) _ -> :ok end end - batch_put(rest) - end - - defp process_index([], index) do - index - |> Map.keys - |> Enum.map(fn {context, entity_id} -> [put_request: [item: %{context: context, entity_id: entity_id, events: Map.get(index, {context, entity_id})}]] end) - end - - defp process_index([event | rest], index) do - index_key = {event.metadata.context, event.metadata.entity_id} - indexed_events = [event.event_id | Map.get(index, index_key, [])] - process_index(rest, Map.put(index, index_key, indexed_events)) end end From f446d4a60b70fc292aa148662ef7a5de584f3b97 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Fri, 10 Nov 2017 13:03:42 -0400 Subject: [PATCH 03/10] add some process handoff logic --- lib/perhap/adapters/eventstore/dynamo.ex | 89 +++++++++++++++++------- test/perhap/event_test.exs | 13 ++++ 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index c98e12c..cf80d60 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -6,6 +6,9 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @type events :: %{ required(Perhap.Event.UUIDv1.t) => Perhap.Event.t } @type indexes :: %{ required({atom(), Perhap.Event.UUIDv4.t}) => list(Perhap.Event.UUIDv1.t) } + @event_table Application.get_env(:perhap_dynamo, :event_table_name, "Events") + @index_table Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") + #@derive [ExAws.Dynamo.Encodable] #defstruct [:event] @@ -19,7 +22,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do def init(args) do interval = 100 #miliseconds Process.send_after(self(), {:batch_write, interval}, interval) - {:ok, %{pending: [], posting: []}} + {:ok, %{pending: [], posting: %{}}} end @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} @@ -31,6 +34,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:reply, :ok, %{events | pending: [event | events.pending]}} end + def handle_call(:put_complete, {pid, _tag}, events = %{posting: posting}) do + {:reply, :received, %{events | posting: Map.delete(posting, pid)}} + end + def handle_info({:batch_write, interval}, events = %{pending: []}) do Process.send_after(self(), {:batch_write, interval}, interval) {:noreply, events} @@ -44,9 +51,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do else err -> raise err end end) + posted = Enum.reduce(chunked, events.posting, fn {pid, chunk}, acc -> Map.put(acc, pid, chunk) end) Process.send_after(self(), {:batch_write, interval}, interval) - {:noreply, %{pending: [], posting: [chunked | events.posting]}} + {:noreply, %{pending: [], posting: posted}} end @doc """ @@ -75,20 +83,48 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} def get_event(event_id) do - event_id_time_order = event_id |> Perhap.Event.uuid_v1_to_time_order - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event_id_time_order}) - |> ExAws.request! + GenServer.call(:eventstore, {:get_event, event_id}) + end - case dynamo_object do - %{"Item" => result} -> - metadata = ExAws.Dynamo.decode_item(Map.get(result, "metadata"), as: Perhap.Event.Metadata) - metadata = %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), type: String.to_atom(metadata.type)} + def handle_call({:get_event, event_id}, _from, events) do + result = case check_pending_events(event_id, events) do + {:ok, event} -> + {:ok, event} + {:error, reason} -> + event_id_time_order = event_id |> Perhap.Event.uuid_v1_to_time_order + dynamo_object = ExAws.Dynamo.get_item(@event_table, %{event_id: event_id_time_order}) + |> ExAws.request! - event = ExAws.Dynamo.decode_item(dynamo_object, as: Perhap.Event) + case dynamo_object do + %{"Item" => result} -> + metadata = ExAws.Dynamo.decode_item(Map.get(result, "metadata"), as: Perhap.Event.Metadata) + metadata = %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), type: String.to_atom(metadata.type)} - {:ok, %Perhap.Event{event | event_id: metadata.event_id, metadata: metadata}} - %{} -> + event = ExAws.Dynamo.decode_item(dynamo_object, as: Perhap.Event) + + {:ok, %Perhap.Event{event | event_id: metadata.event_id, metadata: metadata}} + %{} -> + {:error, "Event not found"} + end + end + {:reply, result, events} + end + + defp check_pending_events(event_id, %{pending: pending, posting: posting}) do + case Enum.find(pending, fn (event) -> event.event_id == event_id end) do + nil -> + check_posting_events(event_id, posting) + event -> + {:ok, event} + end + end + + defp check_posting_events(event_id, posting) do + case Map.values(posting) |> List.flatten |> Enum.find(fn (event) -> event.event_id == event_id end) do + nil -> {:error, "Event not found"} + event -> + {:ok, event} end end @@ -96,7 +132,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do def get_events(context, opts \\ []) do event_ids = case Keyword.has_key?(opts, :entity_id) do true -> - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: context, entity_id: opts[:entity_id]}) + dynamo_object = ExAws.Dynamo.get_item(@index_table, %{context: context, entity_id: opts[:entity_id]}) |> ExAws.request! case dynamo_object do @@ -107,7 +143,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do [] end _ -> - dynamo_object = ExAws.Dynamo.query(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), + dynamo_object = ExAws.Dynamo.query(@index_table, expression_attribute_values: [context: context], key_condition_expression: "context = :context") |> ExAws.request! @@ -141,7 +177,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end defp batch_get([chunk | rest], event_accumulator) do - events = ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_table_name, "Events") => [keys: chunk]}) + events = ExAws.Dynamo.batch_get_item(%{@event_table => [keys: chunk]}) |> ExAws.request! |> Map.get("Responses") |> Map.get("Events") @@ -171,8 +207,8 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, metadata: Map.from_struct(event.metadata)} |> Map.from_struct end) - - batch_put(events) + :ok = batch_put(events) + GenServer.call(:eventstore, :put_complete) end defp batch_put([]) do @@ -187,7 +223,8 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do index_put_request = process_index(events, index) event_put_request = events - |> Enum.map(fn event -> [put_request: [item: event]] end) + |> Enum.map(fn event -> [put_request: + [item: event]] end) do_write_events(events, event_put_request, index_put_request) @@ -196,7 +233,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do defp process_index([], index) do index |> Map.keys - |> Enum.map(fn {context, entity_id} -> [put_request: [item: %{context: context, entity_id: entity_id, events: Map.get(index, {context, entity_id})}]] end) + |> Enum.map(fn {context, entity_id} -> [put_request: + [item: %{context: context, + entity_id: entity_id, + events: Map.get(index, {context, entity_id})}]] end) end defp process_index([event | rest], index) do @@ -207,12 +247,13 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do defp make_index_keys(events) do events - |> Enum.map(fn event -> %{context: event.metadata.context, entity_id: event.metadata.entity_id} end) + |> Enum.map(fn event -> %{context: event.metadata.context, + entity_id: event.metadata.entity_id} end) |> Enum.dedup end defp retrieve_index(index_keys) do - ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => [keys: index_keys]}) + ExAws.Dynamo.batch_get_item(%{@index_table => [keys: index_keys]}) |> ExAws.request! |> Map.get("Responses") |> Map.get("Index") @@ -223,18 +264,18 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end defp do_write_events(events, event_put_request, index_put_request) do - case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_table_name, "Events") => event_put_request}) |> ExAws.request do + case ExAws.Dynamo.batch_write_item(%{@event_table => event_put_request}) |> ExAws.request do {:error, reason} -> IO.puts "Error writing events to dynamo, reason: #{inspect reason}" IO.inspect events _ -> - case ExAws.Dynamo.batch_write_item(%{Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") => index_put_request}) |> ExAws.request do + case ExAws.Dynamo.batch_write_item(%{@index_table => index_put_request}) |> ExAws.request do {:error, reason} -> IO.puts "Error writing index to dynamo, reason: #{inspect reason}" IO.inspect {:events, events} IO.inspect {:index, index_put_request} - Enum.each(events, fn event -> ExAws.Dynamo.delete_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + Enum.each(events, fn event -> ExAws.Dynamo.delete_item(@event_table, %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) _ -> :ok end diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 6854d6d..1e1c5a5 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -200,9 +200,22 @@ defmodule Perhap.EventTestDynamo do {:ok, events} = results assert Enum.member?(events, ev1) Enum.map(rest, fn event -> refute Enum.member?(events, event) end) + end + test "gets an event from the pending store" do + random_context = Enum.random([:k, :l, :m, :n, :o]) + random_event = make_random_event( + %Perhap.Event.Metadata{context: random_context} ) + Perhap.Event.save_event(random_event) + result = Perhap.Event.retrieve_event(random_event.event_id) + :timer.sleep(@pause_interval) + #cleanup + ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! + ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + assert result == {:ok, random_event} end @tag :pending From 5e12cae1a2a55ce1fe27fa9830a01bc3ea457f1c Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Sat, 11 Nov 2017 21:45:31 -0500 Subject: [PATCH 04/10] new test exposed async index overwrite issue, fixing on plane so can't test --- lib/perhap/adapters/eventstore/dynamo.ex | 50 ++++++++++++++---------- test/perhap/event_test.exs | 25 ++++++++++++ 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index cf80d60..7d4a3c9 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -8,6 +8,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @event_table Application.get_env(:perhap_dynamo, :event_table_name, "Events") @index_table Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") + @batch_write_interval Application.get_env(:perhap_dynamo, :batch_write_interval, 100) #@derive [ExAws.Dynamo.Encodable] #defstruct [:event] @@ -20,7 +21,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end def init(args) do - interval = 100 #miliseconds + interval = @batch_write_interval Process.send_after(self(), {:batch_write, interval}, interval) {:ok, %{pending: [], posting: %{}}} end @@ -35,6 +36,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end def handle_call(:put_complete, {pid, _tag}, events = %{posting: posting}) do + :ok = write_index(Map.get(posting, pid)) {:reply, :received, %{events | posting: Map.delete(posting, pid)}} end @@ -208,6 +210,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do metadata: Map.from_struct(event.metadata)} |> Map.from_struct end) :ok = batch_put(events) + IO.puts "stored #{Enum.count(events)} events" GenServer.call(:eventstore, :put_complete) end @@ -216,18 +219,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end defp batch_put(events) do - index_keys = make_index_keys(events) - - index = retrieve_index(index_keys) - - index_put_request = process_index(events, index) - event_put_request = events |> Enum.map(fn event -> [put_request: [item: event]] end) - - do_write_events(events, event_put_request, index_put_request) - + do_write_events(events, event_put_request) end defp process_index([], index) do @@ -263,22 +258,35 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do #unprocessed keys end - defp do_write_events(events, event_put_request, index_put_request) do + defp do_write_events(events, event_put_request) do case ExAws.Dynamo.batch_write_item(%{@event_table => event_put_request}) |> ExAws.request do {:error, reason} -> IO.puts "Error writing events to dynamo, reason: #{inspect reason}" IO.inspect events _ -> - case ExAws.Dynamo.batch_write_item(%{@index_table => index_put_request}) |> ExAws.request do - {:error, reason} -> - IO.puts "Error writing index to dynamo, reason: #{inspect reason}" - IO.inspect {:events, events} - IO.inspect {:index, index_put_request} - - Enum.each(events, fn event -> ExAws.Dynamo.delete_item(@event_table, %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) - _ -> - :ok - end + :ok + end + end + + + defp write_index(events) do + index_keys = make_index_keys(events) + + index = retrieve_index(index_keys) + + index_put_request = process_index(events, index) + + case ExAws.Dynamo.batch_write_item(%{@index_table => index_put_request}) |> ExAws.request do + {:error, reason} -> + IO.puts "Error writing index to dynamo, reason: #{inspect reason}" + IO.inspect {:events, events} + IO.inspect {:index, index_put_request} + + Enum.each(events, fn event -> + ExAws.Dynamo.delete_item(@event_table, %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) + |> ExAws.request! end) + _ -> + :ok end end diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 1e1c5a5..e16b255 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -218,6 +218,31 @@ defmodule Perhap.EventTestDynamo do assert result == {:ok, random_event} end + test "can read and write more than 100 events" do + context = :test_more_than_100_events + random_entity = Perhap.Event.get_uuid_v4 + events = Enum.map(1..125, fn _number -> make_random_event( + %Perhap.Event.Metadata{context: context, entity_id: random_entity}) end) + Enum.each(events, fn event -> Perhap.Event.save_event(event) end) + :timer.sleep(@pause_interval) + results = Perhap.Event.retrieve_events(context) + + #cleanup + #Enum.map(events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + #ExAws.Dynamo.delete_item("Index", %{context: context, entity_id: random_entity}) |> ExAws.request! + #/cleanup + + sorted_events = Enum.sort(events, fn event1, event2 -> event1.event_id >= event2.event_id end) + with {:ok, retrieved_events} <- results do + sorted_results = Enum.sort(retrieved_events, fn event1, event2 -> event1.event_id >= event2.event_id end) + + assert Enum.count(sorted_events) == Enum.count(sorted_results) + end + + + end + + @tag :pending test "retrieves events filtered by an event_type" do end From a8d7c49c401ef191d5b48e89540d843f48269ea4 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Sun, 12 Nov 2017 18:31:46 -0800 Subject: [PATCH 05/10] update tests and clean up some code --- lib/perhap/adapters/eventstore/dynamo.ex | 66 ++++++++++-------------- test/perhap/event_test.exs | 10 ++-- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 7d4a3c9..845f887 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -10,27 +10,35 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @index_table Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") @batch_write_interval Application.get_env(:perhap_dynamo, :batch_write_interval, 100) - #@derive [ExAws.Dynamo.Encodable] - #defstruct [:event] - - #alias __MODULE__ + ### Interface @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(args) do GenServer.start_link(__MODULE__, [], name: :eventstore) end + @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} + def put_event(event) do + GenServer.call(:eventstore, {:put_event, event}) + end + + @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} + def get_event(event_id) do + GenServer.call(:eventstore, {:get_event, event_id}) + end + + #@spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} + #def get_events(context, opts \\ []) do + # GenServer.call(:eventstore, {:get_events, context, opts}) + #end + + ### Server def init(args) do interval = @batch_write_interval Process.send_after(self(), {:batch_write, interval}, interval) {:ok, %{pending: [], posting: %{}}} end - @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} - def put_event(event) do - GenServer.call(:eventstore, {:put_event, event}) - end - def handle_call({:put_event, event}, _from, events) do {:reply, :ok, %{events | pending: [event | events.pending]}} end @@ -59,35 +67,8 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:noreply, %{pending: [], posting: posted}} end - @doc """ - def put_event(event) do - event = %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order} - ExAws.Dynamo.put_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{Map.from_struct(event) | metadata: Map.from_struct(event.metadata)}) - |> ExAws.request! - - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: event.metadata.context, entity_id: event.metadata.entity_id}) - |> ExAws.request! - - indexed_events = case dynamo_object do - %{"Item" => data} -> - Map.get(data, "events") |> ExAws.Dynamo.Decoder.decode - %{} -> - [] - end - - ExAws.Dynamo.put_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: event.metadata.context, entity_id: event.metadata.entity_id, events: [event.event_id | indexed_events]}) - |> ExAws.request! - - :ok - end - """ - @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} - def get_event(event_id) do - GenServer.call(:eventstore, {:get_event, event_id}) - end - def handle_call({:get_event, event_id}, _from, events) do result = case check_pending_events(event_id, events) do {:ok, event} -> @@ -185,9 +166,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do |> Map.get("Events") |> Enum.map(fn event -> {event, ExAws.Dynamo.decode_item(event["metadata"], as: Perhap.Event.Metadata)} end) |> Enum.map(fn {event, metadata} -> - %Perhap.Event{ExAws.Dynamo.decode_item(event, as: Perhap.Event) | event_id: metadata.event_id, - metadata: %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), - type: String.to_atom(metadata.type)}} end) + %Perhap.Event{ExAws.Dynamo.decode_item(event, as: Perhap.Event) + | event_id: metadata.event_id, + metadata: %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), + type: String.to_atom(metadata.type)}} end) batch_get(rest, event_accumulator ++ events) end @@ -210,7 +192,6 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do metadata: Map.from_struct(event.metadata)} |> Map.from_struct end) :ok = batch_put(events) - IO.puts "stored #{Enum.count(events)} events" GenServer.call(:eventstore, :put_complete) end @@ -270,6 +251,11 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do defp write_index(events) do + events = events + |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, + metadata: Map.from_struct(event.metadata)} + |> Map.from_struct end) + index_keys = make_index_keys(events) index = retrieve_index(index_keys) diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index e16b255..fa5e182 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -2,7 +2,6 @@ defmodule Perhap.EventTestDynamo do use ExUnit.Case, async: false import PerhapTest.Helper, only: :functions - @interval 1 @pause_interval 500 setup do @@ -224,19 +223,20 @@ defmodule Perhap.EventTestDynamo do events = Enum.map(1..125, fn _number -> make_random_event( %Perhap.Event.Metadata{context: context, entity_id: random_entity}) end) Enum.each(events, fn event -> Perhap.Event.save_event(event) end) - :timer.sleep(@pause_interval) + :timer.sleep(4000) + results = Perhap.Event.retrieve_events(context) #cleanup - #Enum.map(events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) - #ExAws.Dynamo.delete_item("Index", %{context: context, entity_id: random_entity}) |> ExAws.request! + Enum.map(events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + ExAws.Dynamo.delete_item("Index", %{context: context, entity_id: random_entity}) |> ExAws.request! #/cleanup sorted_events = Enum.sort(events, fn event1, event2 -> event1.event_id >= event2.event_id end) with {:ok, retrieved_events} <- results do sorted_results = Enum.sort(retrieved_events, fn event1, event2 -> event1.event_id >= event2.event_id end) - assert Enum.count(sorted_events) == Enum.count(sorted_results) + assert sorted_events == sorted_results end From 2cd74484cb16cd21af1ae7def36e3452c9e5fa6d Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Sun, 12 Nov 2017 22:10:31 -0800 Subject: [PATCH 06/10] get_events now uses the eventstore process and checks for local events as well as dynamo events --- lib/perhap/adapters/eventstore/dynamo.ex | 114 +++++++++++++---------- 1 file changed, 63 insertions(+), 51 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 845f887..ca4870c 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -27,10 +27,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do GenServer.call(:eventstore, {:get_event, event_id}) end - #@spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} - #def get_events(context, opts \\ []) do - # GenServer.call(:eventstore, {:get_events, context, opts}) - #end + @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} + def get_events(context, opts \\ []) do + GenServer.call(:eventstore, {:get_events, context, opts}) + end ### Server def init(args) do @@ -48,27 +48,6 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:reply, :received, %{events | posting: Map.delete(posting, pid)}} end - def handle_info({:batch_write, interval}, events = %{pending: []}) do - Process.send_after(self(), {:batch_write, interval}, interval) - {:noreply, events} - end - - def handle_info({:batch_write, interval}, events) do - chunked = events.pending - |> Enum.chunk_every(25) - |> Enum.map(fn chunk -> with {:ok, pid} <- Task.start(__MODULE__, :put_to_dynamo, [chunk]) - do {pid, chunk} - else err -> raise err - end - end) - posted = Enum.reduce(chunked, events.posting, fn {pid, chunk}, acc -> Map.put(acc, pid, chunk) end) - - Process.send_after(self(), {:batch_write, interval}, interval) - {:noreply, %{pending: [], posting: posted}} - end - - - def handle_call({:get_event, event_id}, _from, events) do result = case check_pending_events(event_id, events) do {:ok, event} -> @@ -93,42 +72,24 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:reply, result, events} end - defp check_pending_events(event_id, %{pending: pending, posting: posting}) do - case Enum.find(pending, fn (event) -> event.event_id == event_id end) do - nil -> - check_posting_events(event_id, posting) - event -> - {:ok, event} - end - end - - defp check_posting_events(event_id, posting) do - case Map.values(posting) |> List.flatten |> Enum.find(fn (event) -> event.event_id == event_id end) do - nil -> - {:error, "Event not found"} - event -> - {:ok, event} - end - end - - @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} - def get_events(context, opts \\ []) do + def handle_call({:get_events, context, opts}, from, event_state) do event_ids = case Keyword.has_key?(opts, :entity_id) do true -> dynamo_object = ExAws.Dynamo.get_item(@index_table, %{context: context, entity_id: opts[:entity_id]}) |> ExAws.request! - case dynamo_object do + from_dynamo = case dynamo_object do %{"Item" => data} -> ExAws.Dynamo.Decoder.decode(data) |> Map.get("events", []) %{} -> [] end + from_dynamo ++ get_by_context(event_state, context, opts[:entity_id]) _ -> - dynamo_object = ExAws.Dynamo.query(@index_table, - expression_attribute_values: [context: context], - key_condition_expression: "context = :context") + get_by_context(event_state, context, nil) ++ ExAws.Dynamo.query(@index_table, + expression_attribute_values: [context: context], + key_condition_expression: "context = :context") |> ExAws.request! |> Map.get("Items") |> Enum.map(fn x -> ExAws.Dynamo.Decoder.decode(x) end) @@ -138,7 +99,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end if event_ids == [] do - {:ok, []} + {:reply, {:ok, []}, event_state} else event_ids2 = case Keyword.has_key?(opts, :after) do true -> @@ -151,10 +112,61 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do events = Enum.chunk_every(event_ids3, 100) |> batch_get([]) - {:ok, events} + {:reply, {:ok, events}, event_state} end end + def handle_info({:batch_write, interval}, events = %{pending: []}) do + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, events} + end + + def handle_info({:batch_write, interval}, events) do + chunked = events.pending + |> Enum.chunk_every(25) + |> Enum.map(fn chunk -> with {:ok, pid} <- Task.start(__MODULE__, :put_to_dynamo, [chunk]) + do {pid, chunk} + else err -> raise err + end + end) + posted = Enum.reduce(chunked, events.posting, fn {pid, chunk}, acc -> Map.put(acc, pid, chunk) end) + + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, %{pending: [], posting: posted}} + end + + ### Helpers + + defp check_pending_events(event_id, %{pending: pending, posting: posting}) do + case Enum.find(pending, fn (event) -> event.event_id == event_id end) do + nil -> + check_posting_events(event_id, posting) + event -> + {:ok, event} + end + end + + defp check_posting_events(event_id, posting) do + case Map.values(posting) |> List.flatten |> Enum.find(fn (event) -> event.event_id == event_id end) do + nil -> + {:error, "Event not found"} + event -> + {:ok, event} + end + end + + defp get_by_context(%{pending: pending, posting: posting}, context, nil) do + (Enum.filter(pending, fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) + ++ (Map.values(posting) |> List.flatten |> Enum.filter(fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) + end + + defp get_by_context(%{pending: pending, posting: posting}, context, entity_id) do + (Enum.filter(pending, fn event -> event.metadata.context == context and event.metadata.entity_id == entity_id end) |> Enum.map(fn event -> event.event_id end)) + ++ (Map.values(posting) |> List.flatten |> Enum.filter(fn event -> event.metadata.context == context and event.metadata.entity_id == entity_id end) |> Enum.map(fn event -> event.event_id end)) + end + + + defp batch_get([], events) do events end From 0b8de8e0ab8d80127d17d5fae05f1b283d2c06f3 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Sun, 12 Nov 2017 22:14:21 -0800 Subject: [PATCH 07/10] clean up some warnings --- lib/perhap/adapters/eventstore/dynamo.ex | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index ca4870c..668a4d3 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -14,7 +14,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do ### Interface @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(args) do - GenServer.start_link(__MODULE__, [], name: :eventstore) + GenServer.start_link(__MODULE__, args, name: :eventstore) end @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} @@ -33,7 +33,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end ### Server - def init(args) do + def init(_args) do interval = @batch_write_interval Process.send_after(self(), {:batch_write, interval}, interval) {:ok, %{pending: [], posting: %{}}} @@ -52,7 +52,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do result = case check_pending_events(event_id, events) do {:ok, event} -> {:ok, event} - {:error, reason} -> + {:error, _reason} -> event_id_time_order = event_id |> Perhap.Event.uuid_v1_to_time_order dynamo_object = ExAws.Dynamo.get_item(@event_table, %{event_id: event_id_time_order}) |> ExAws.request! @@ -72,7 +72,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:reply, result, events} end - def handle_call({:get_events, context, opts}, from, event_state) do + def handle_call({:get_events, context, opts}, _from, event_state) do event_ids = case Keyword.has_key?(opts, :entity_id) do true -> dynamo_object = ExAws.Dynamo.get_item(@index_table, %{context: context, entity_id: opts[:entity_id]}) @@ -193,10 +193,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end end - defp decode_data(data) do - Enum.reduce(data, %{}, fn({key, value}, map) -> - Map.put(map, String.to_atom(key), value) end) - end + #defp decode_data(data) do + # Enum.reduce(data, %{}, fn({key, value}, map) -> + # Map.put(map, String.to_atom(key), value) end) + #end def put_to_dynamo(events) do events = events From ef134ec4b136315605318c3d17367b7c86ba95ff Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Tue, 14 Nov 2017 18:06:01 -0800 Subject: [PATCH 08/10] add some docs --- lib/perhap/adapters/eventstore/dynamo.ex | 38 ++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 668a4d3..bf8d892 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -1,4 +1,25 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do + @moduledoc """ + Dynamo Adapter for perhap. + + This is a singleton instance of the eventstore. It receives events from calling + processes through the put_event/1 interface. It stores the event locally and + returns control to the calling process immediately. + + Approximately every 100ms (configurable) it takes all the events that have been + saved up by put_event and creates new processes to submit those events to Dynamo. + + The Dynamo instance is configured in the application config (usually config.exs) + where the event table, index table, batching interval parts of this adapter and + the aws credentials and region can be specified for the supporting library ExAws + + There are two ways of getting events, get_event/1 which takes an event_id and + returns only 1 event matching that id, and get_event/2 which takes a context atom + and two options :entity_id and :after for narrowing the results. + + Both get_event and get_events check the local events before contacting Dynamo + for additional results. + """ use Perhap.Adapters.Eventstore use GenServer @@ -17,11 +38,28 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do GenServer.start_link(__MODULE__, args, name: :eventstore) end + @doc """ + Put event saves an event within the local process and returns control to the + calling process. This adapter treats events collected locally as saved to Dynamo. + + ## Example: + + >Perhap.Adapters.Eventstore.Dynamo.put_event(%Perhap.Event{}) + :ok + + """ @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} def put_event(event) do GenServer.call(:eventstore, {:put_event, event}) end + @doc """ + Retrieves an event using its event_id. Checks the locally stored events before + going to Dynamo for the event. + + ## Example: + Perhap.Adapters.Eventstore.Dynamo.get_event(event_id) + """ @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} def get_event(event_id) do GenServer.call(:eventstore, {:get_event, event_id}) From 23e2fcfe2af7006e3dbebb3bdf989eb3e190a929 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Wed, 15 Nov 2017 10:09:18 -0800 Subject: [PATCH 09/10] more docs --- lib/perhap/adapters/eventstore/dynamo.ex | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index bf8d892..e3dd65b 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -58,13 +58,32 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do going to Dynamo for the event. ## Example: - Perhap.Adapters.Eventstore.Dynamo.get_event(event_id) + {:ok, %Perhap.Event{}} = Perhap.Adapters.Eventstore.Dynamo.get_event(event_id) """ @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} def get_event(event_id) do GenServer.call(:eventstore, {:get_event, event_id}) end + @doc """ + Retrieves events based on the context associated with those events. The context + can be found in the event metadata (Perhap.Event.Metadata). The results can be + narrowed by supplying an entity_id as an option or an after option which will only + retrieve events created after the supplied event_id. + + Returns a list of events inside an :ok tuple response ({:ok, events}) or empty + {:ok, []} if no events were found. Can return an :error tuple (:error, reason) + if an unknown error occurred. + + ## Example: + + {:ok, events} = get_events(event.metadata.context) #get all events with the same context as this event + {:ok []} = get_Events(:no_events_with_this_context) + {:ok, events} = get_events(event.metadata.context, [entity_id: event.metadata.entity_id]) + {:ok, events} = get_events(event.metadata.context, [after: event.event_id]) + + """ + @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} def get_events(context, opts \\ []) do GenServer.call(:eventstore, {:get_events, context, opts}) From 45ee3490438ae432b3fc95f76b8d6de9e7638320 Mon Sep 17 00:00:00 2001 From: Alan Dyer Date: Thu, 16 Nov 2017 15:28:11 -0800 Subject: [PATCH 10/10] crash handling for eventstore --- lib/perhap/adapters/eventstore/dynamo.ex | 37 +++++++++++++++++++----- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index e3dd65b..f4bd179 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -31,7 +31,6 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @index_table Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") @batch_write_interval Application.get_env(:perhap_dynamo, :batch_write_interval, 100) - ### Interface @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(args) do @@ -83,15 +82,18 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:ok, events} = get_events(event.metadata.context, [after: event.event_id]) """ - + @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} def get_events(context, opts \\ []) do GenServer.call(:eventstore, {:get_events, context, opts}) end + + ### Server def init(_args) do interval = @batch_write_interval + Process.flag(:trap_exit, true) Process.send_after(self(), {:batch_write, interval}, interval) {:ok, %{pending: [], posting: %{}}} end @@ -192,6 +194,10 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do {:noreply, %{pending: [], posting: posted}} end + def terminate(_reason, events) do + dump_events_to_disk(events) + end + ### Helpers defp check_pending_events(event_id, %{pending: pending, posting: posting}) do @@ -212,6 +218,17 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end end + defp dump_events_to_disk(events) do + try do + {:ok, file} = File.open("Perhap_dynamo_process_dump_" <> inspect(:os.system_time(:millisecond)), [:write, :utf8]) + IO.inspect(file, events, []) + File.close(file) + rescue + error -> IO.puts(:stderr, "Error writing dynamo crash dump to file: #{inspect error}") + IO.inspect(:stderr, events) + end + end + defp get_by_context(%{pending: pending, posting: posting}, context, nil) do (Enum.filter(pending, fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) ++ (Map.values(posting) |> List.flatten |> Enum.filter(fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) @@ -298,14 +315,20 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end defp retrieve_index(index_keys) do - ExAws.Dynamo.batch_get_item(%{@index_table => [keys: index_keys]}) + results = ExAws.Dynamo.batch_get_item(%{@index_table => [keys: index_keys]}) |> ExAws.request! |> Map.get("Responses") |> Map.get("Index") - |> Enum.map(fn index_item -> ExAws.Dynamo.Decoder.decode(index_item) end) - |> Enum.map(fn index_item -> %{index_item | "context" => String.to_atom(index_item["context"])} end) - |> Enum.reduce(%{}, fn (index, map) -> Map.put(map, {index["context"], index["entity_id"]}, index["events"]) end) - #unprocessed keys + case results do + nil -> + %{} + index -> + index + |> Enum.map(fn index_item -> ExAws.Dynamo.Decoder.decode(index_item) end) + |> Enum.map(fn index_item -> %{index_item | "context" => String.to_atom(index_item["context"])} end) + |> Enum.reduce(%{}, fn (index, map) -> Map.put(map, {index["context"], index["entity_id"]}, index["events"]) end) + end + end defp do_write_events(events, event_put_request) do