diff --git a/lib/perhap/adapters/eventstore/dynamo.ex b/lib/perhap/adapters/eventstore/dynamo.ex index 80a97a2..f4bd179 100644 --- a/lib/perhap/adapters/eventstore/dynamo.ex +++ b/lib/perhap/adapters/eventstore/dynamo.ex @@ -1,4 +1,25 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do + @moduledoc """ + Dynamo Adapter for perhap. + + This is a singleton instance of the eventstore. It receives events from calling + processes through the put_event/1 interface. It stores the event locally and + returns control to the calling process immediately. + + Approximately every 100ms (configurable) it takes all the events that have been + saved up by put_event and creates new processes to submit those events to Dynamo. + + The Dynamo instance is configured in the application config (usually config.exs) + where the event table, index table, batching interval parts of this adapter and + the aws credentials and region can be specified for the supporting library ExAws + + There are two ways of getting events, get_event/1 which takes an event_id and + returns only 1 event matching that id, and get_event/2 which takes a context atom + and two options :entity_id and :after for narrowing the results. + + Both get_event and get_events check the local events before contacting Dynamo + for additional results. + """ use Perhap.Adapters.Eventstore use GenServer @@ -6,75 +27,128 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do @type events :: %{ required(Perhap.Event.UUIDv1.t) => Perhap.Event.t } @type indexes :: %{ required({atom(), Perhap.Event.UUIDv4.t}) => list(Perhap.Event.UUIDv1.t) } - #@derive [ExAws.Dynamo.Encodable] - #defstruct [:event] - - #alias __MODULE__ + @event_table Application.get_env(:perhap_dynamo, :event_table_name, "Events") + @index_table Application.get_env(:perhap_dynamo, :event_index_table_name, "Index") + @batch_write_interval Application.get_env(:perhap_dynamo, :batch_write_interval, 100) + ### Interface @spec start_link(opts: any()) :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term} def start_link(args) do - {:ok, pid} = GenServer.start_link(__MODULE__, args) + GenServer.start_link(__MODULE__, args, name: :eventstore) end - @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} - def put_event(event) do - event = %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order} - ExAws.Dynamo.put_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{Map.from_struct(event) | metadata: Map.from_struct(event.metadata)}) - |> ExAws.request! - - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: event.metadata.context, entity_id: event.metadata.entity_id}) - |> ExAws.request! - - indexed_events = case dynamo_object do - %{"Item" => data} -> - Map.get(data, "events") |> ExAws.Dynamo.Decoder.decode - %{} -> - [] - end + @doc """ + Put event saves an event within the local process and returns control to the + calling process. This adapter treats events collected locally as saved to Dynamo. - ExAws.Dynamo.put_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: event.metadata.context, entity_id: event.metadata.entity_id, events: [event.event_id | indexed_events]}) - |> ExAws.request! + ## Example: - :ok + >Perhap.Adapters.Eventstore.Dynamo.put_event(%Perhap.Event{}) + :ok + + """ + @spec put_event(event: Perhap.Event.t) :: :ok | {:error, term} + def put_event(event) do + GenServer.call(:eventstore, {:put_event, event}) end + @doc """ + Retrieves an event using its event_id. Checks the locally stored events before + going to Dynamo for the event. + + ## Example: + {:ok, %Perhap.Event{}} = Perhap.Adapters.Eventstore.Dynamo.get_event(event_id) + """ @spec get_event(event_id: Perhap.Event.UUIDv1) :: {:ok, Perhap.Event.t} | {:error, term} def get_event(event_id) do - event_id_time_order = event_id |> Perhap.Event.uuid_v1_to_time_order - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_table_name, "Events"), %{event_id: event_id_time_order}) - |> ExAws.request! + GenServer.call(:eventstore, {:get_event, event_id}) + end - case dynamo_object do - %{"Item" => result} -> - metadata = ExAws.Dynamo.decode_item(Map.get(result, "metadata"), as: Perhap.Event.Metadata) - metadata = %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), type: String.to_atom(metadata.type)} + @doc """ + Retrieves events based on the context associated with those events. The context + can be found in the event metadata (Perhap.Event.Metadata). The results can be + narrowed by supplying an entity_id as an option or an after option which will only + retrieve events created after the supplied event_id. - event = ExAws.Dynamo.decode_item(dynamo_object, as: Perhap.Event) + Returns a list of events inside an :ok tuple response ({:ok, events}) or empty + {:ok, []} if no events were found. Can return an :error tuple (:error, reason) + if an unknown error occurred. - {:ok, %Perhap.Event{event | event_id: metadata.event_id, metadata: metadata}} - %{} -> - {:error, "Event not found"} - end - end + ## Example: + + {:ok, events} = get_events(event.metadata.context) #get all events with the same context as this event + {:ok []} = get_Events(:no_events_with_this_context) + {:ok, events} = get_events(event.metadata.context, [entity_id: event.metadata.entity_id]) + {:ok, events} = get_events(event.metadata.context, [after: event.event_id]) + + """ @spec get_events(atom(), [entity_id: Perhap.Event.UUIDv4.t, after: Perhap.Event.UUIDv1.t]) :: {:ok, list(Perhap.Event.t)} | {:error, term} def get_events(context, opts \\ []) do + GenServer.call(:eventstore, {:get_events, context, opts}) + end + + + + ### Server + def init(_args) do + interval = @batch_write_interval + Process.flag(:trap_exit, true) + Process.send_after(self(), {:batch_write, interval}, interval) + {:ok, %{pending: [], posting: %{}}} + end + + def handle_call({:put_event, event}, _from, events) do + {:reply, :ok, %{events | pending: [event | events.pending]}} + end + + def handle_call(:put_complete, {pid, _tag}, events = %{posting: posting}) do + :ok = write_index(Map.get(posting, pid)) + {:reply, :received, %{events | posting: Map.delete(posting, pid)}} + end + + def handle_call({:get_event, event_id}, _from, events) do + result = case check_pending_events(event_id, events) do + {:ok, event} -> + {:ok, event} + {:error, _reason} -> + event_id_time_order = event_id |> Perhap.Event.uuid_v1_to_time_order + dynamo_object = ExAws.Dynamo.get_item(@event_table, %{event_id: event_id_time_order}) + |> ExAws.request! + + case dynamo_object do + %{"Item" => result} -> + metadata = ExAws.Dynamo.decode_item(Map.get(result, "metadata"), as: Perhap.Event.Metadata) + metadata = %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), type: String.to_atom(metadata.type)} + + event = ExAws.Dynamo.decode_item(dynamo_object, as: Perhap.Event) + + {:ok, %Perhap.Event{event | event_id: metadata.event_id, metadata: metadata}} + %{} -> + {:error, "Event not found"} + end + end + {:reply, result, events} + end + + def handle_call({:get_events, context, opts}, _from, event_state) do event_ids = case Keyword.has_key?(opts, :entity_id) do true -> - dynamo_object = ExAws.Dynamo.get_item(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), %{context: context, entity_id: opts[:entity_id]}) + dynamo_object = ExAws.Dynamo.get_item(@index_table, %{context: context, entity_id: opts[:entity_id]}) |> ExAws.request! - case dynamo_object do + from_dynamo = case dynamo_object do %{"Item" => data} -> ExAws.Dynamo.Decoder.decode(data) |> Map.get("events", []) %{} -> [] end + from_dynamo ++ get_by_context(event_state, context, opts[:entity_id]) _ -> - dynamo_object = ExAws.Dynamo.query(Application.get_env(:perhap_dynamo, :event_index_table_name, "Index"), - expression_attribute_values: [context: context], - key_condition_expression: "context = :context") + get_by_context(event_state, context, nil) ++ ExAws.Dynamo.query(@index_table, + expression_attribute_values: [context: context], + key_condition_expression: "context = :context") |> ExAws.request! |> Map.get("Items") |> Enum.map(fn x -> ExAws.Dynamo.Decoder.decode(x) end) @@ -84,7 +158,7 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end if event_ids == [] do - {:ok, []} + {:reply, {:ok, []}, event_state} else event_ids2 = case Keyword.has_key?(opts, :after) do true -> @@ -97,24 +171,91 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do events = Enum.chunk_every(event_ids3, 100) |> batch_get([]) - {:ok, events} + {:reply, {:ok, events}, event_state} end end + def handle_info({:batch_write, interval}, events = %{pending: []}) do + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, events} + end + + def handle_info({:batch_write, interval}, events) do + chunked = events.pending + |> Enum.chunk_every(25) + |> Enum.map(fn chunk -> with {:ok, pid} <- Task.start(__MODULE__, :put_to_dynamo, [chunk]) + do {pid, chunk} + else err -> raise err + end + end) + posted = Enum.reduce(chunked, events.posting, fn {pid, chunk}, acc -> Map.put(acc, pid, chunk) end) + + Process.send_after(self(), {:batch_write, interval}, interval) + {:noreply, %{pending: [], posting: posted}} + end + + def terminate(_reason, events) do + dump_events_to_disk(events) + end + + ### Helpers + + defp check_pending_events(event_id, %{pending: pending, posting: posting}) do + case Enum.find(pending, fn (event) -> event.event_id == event_id end) do + nil -> + check_posting_events(event_id, posting) + event -> + {:ok, event} + end + end + + defp check_posting_events(event_id, posting) do + case Map.values(posting) |> List.flatten |> Enum.find(fn (event) -> event.event_id == event_id end) do + nil -> + {:error, "Event not found"} + event -> + {:ok, event} + end + end + + defp dump_events_to_disk(events) do + try do + {:ok, file} = File.open("Perhap_dynamo_process_dump_" <> inspect(:os.system_time(:millisecond)), [:write, :utf8]) + IO.inspect(file, events, []) + File.close(file) + rescue + error -> IO.puts(:stderr, "Error writing dynamo crash dump to file: #{inspect error}") + IO.inspect(:stderr, events) + end + end + + defp get_by_context(%{pending: pending, posting: posting}, context, nil) do + (Enum.filter(pending, fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) + ++ (Map.values(posting) |> List.flatten |> Enum.filter(fn event -> event.metadata.context == context end) |> Enum.map(fn event -> event.event_id end)) + end + + defp get_by_context(%{pending: pending, posting: posting}, context, entity_id) do + (Enum.filter(pending, fn event -> event.metadata.context == context and event.metadata.entity_id == entity_id end) |> Enum.map(fn event -> event.event_id end)) + ++ (Map.values(posting) |> List.flatten |> Enum.filter(fn event -> event.metadata.context == context and event.metadata.entity_id == entity_id end) |> Enum.map(fn event -> event.event_id end)) + end + + + defp batch_get([], events) do events end defp batch_get([chunk | rest], event_accumulator) do - events = ExAws.Dynamo.batch_get_item(%{Application.get_env(:perhap_dynamo, :event_table_name, "Events") => [keys: chunk]}) + events = ExAws.Dynamo.batch_get_item(%{@event_table => [keys: chunk]}) |> ExAws.request! |> Map.get("Responses") |> Map.get("Events") |> Enum.map(fn event -> {event, ExAws.Dynamo.decode_item(event["metadata"], as: Perhap.Event.Metadata)} end) |> Enum.map(fn {event, metadata} -> - %Perhap.Event{ExAws.Dynamo.decode_item(event, as: Perhap.Event) | event_id: metadata.event_id, - metadata: %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), - type: String.to_atom(metadata.type)}} end) + %Perhap.Event{ExAws.Dynamo.decode_item(event, as: Perhap.Event) + | event_id: metadata.event_id, + metadata: %Perhap.Event.Metadata{metadata | context: String.to_atom(metadata.context), + type: String.to_atom(metadata.type)}} end) batch_get(rest, event_accumulator ++ events) end @@ -126,8 +267,105 @@ defmodule Perhap.Adapters.Eventstore.Dynamo do end end - defp decode_data(data) do - Enum.reduce(data, %{}, fn({key, value}, map) -> - Map.put(map, String.to_atom(key), value) end) + #defp decode_data(data) do + # Enum.reduce(data, %{}, fn({key, value}, map) -> + # Map.put(map, String.to_atom(key), value) end) + #end + + def put_to_dynamo(events) do + events = events + |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, + metadata: Map.from_struct(event.metadata)} + |> Map.from_struct end) + :ok = batch_put(events) + GenServer.call(:eventstore, :put_complete) + end + + defp batch_put([]) do + :ok + end + + defp batch_put(events) do + event_put_request = events + |> Enum.map(fn event -> [put_request: + [item: event]] end) + do_write_events(events, event_put_request) + end + + defp process_index([], index) do + index + |> Map.keys + |> Enum.map(fn {context, entity_id} -> [put_request: + [item: %{context: context, + entity_id: entity_id, + events: Map.get(index, {context, entity_id})}]] end) + end + + defp process_index([event | rest], index) do + index_key = {event.metadata.context, event.metadata.entity_id} + indexed_events = [event.event_id | Map.get(index, index_key, [])] + process_index(rest, Map.put(index, index_key, indexed_events)) + end + + defp make_index_keys(events) do + events + |> Enum.map(fn event -> %{context: event.metadata.context, + entity_id: event.metadata.entity_id} end) + |> Enum.dedup + end + + defp retrieve_index(index_keys) do + results = ExAws.Dynamo.batch_get_item(%{@index_table => [keys: index_keys]}) + |> ExAws.request! + |> Map.get("Responses") + |> Map.get("Index") + case results do + nil -> + %{} + index -> + index + |> Enum.map(fn index_item -> ExAws.Dynamo.Decoder.decode(index_item) end) + |> Enum.map(fn index_item -> %{index_item | "context" => String.to_atom(index_item["context"])} end) + |> Enum.reduce(%{}, fn (index, map) -> Map.put(map, {index["context"], index["entity_id"]}, index["events"]) end) + end + + end + + defp do_write_events(events, event_put_request) do + case ExAws.Dynamo.batch_write_item(%{@event_table => event_put_request}) |> ExAws.request do + {:error, reason} -> + IO.puts "Error writing events to dynamo, reason: #{inspect reason}" + IO.inspect events + _ -> + :ok + end + end + + + defp write_index(events) do + events = events + |> Enum.map(fn event -> %Perhap.Event{event | event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order, + metadata: Map.from_struct(event.metadata)} + |> Map.from_struct end) + + index_keys = make_index_keys(events) + + index = retrieve_index(index_keys) + + index_put_request = process_index(events, index) + + case ExAws.Dynamo.batch_write_item(%{@index_table => index_put_request}) |> ExAws.request do + {:error, reason} -> + IO.puts "Error writing index to dynamo, reason: #{inspect reason}" + IO.inspect {:events, events} + IO.inspect {:index, index_put_request} + + Enum.each(events, fn event -> + ExAws.Dynamo.delete_item(@event_table, %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) + |> ExAws.request! end) + _ -> + :ok + end end + end diff --git a/lib/perhap_dynamo.ex b/lib/perhap_dynamo.ex index 29f5eea..29bf76a 100644 --- a/lib/perhap_dynamo.ex +++ b/lib/perhap_dynamo.ex @@ -1,18 +1,3 @@ defmodule PerhapDynamo do - @moduledoc """ - Documentation for PerhapDynamo. - """ - @doc """ - Hello world. - - ## Examples - - iex> PerhapDynamo.hello - :world - - """ - def hello do - :world - end end diff --git a/test/perhap/adapters/eventstore/dynamo_test.exs b/test/perhap/adapters/eventstore/dynamo_test.exs index 15f0ce4..11a411d 100644 --- a/test/perhap/adapters/eventstore/dynamo_test.exs +++ b/test/perhap/adapters/eventstore/dynamo_test.exs @@ -2,8 +2,12 @@ defmodule PerhapTest.Adapters.Dynamo do use PerhapTest.Helper, port: 4499 alias Perhap.Adapters.Eventstore.Dynamo + @pause_interval 500 + setup do Application.put_env(:perhap, :eventstore, Perhap.Adapters.Eventstore.Dynamo, []) + Perhap.Adapters.Eventstore.Dynamo.start_link([]) + :ok end @@ -11,26 +15,34 @@ defmodule PerhapTest.Adapters.Dynamo do random_context = Enum.random([:a, :b, :c, :d, :e]) random_event = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) - assert ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! == %{} + check1 = ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! Dynamo.put_event(random_event) - refute ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! == %{} + :timer.sleep(@pause_interval) + check2 = ExAws.Dynamo.get_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + + assert check1 == %{} + refute check2 == %{} end test "get_event" do random_context = Enum.random([:a, :b, :c, :d, :e]) random_event = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: Perhap.Event.get_uuid_v4()} ) - assert Dynamo.get_event(random_event.event_id) == {:error, "Event not found"} + check1 = Dynamo.get_event(random_event.event_id) Dynamo.put_event(random_event) - assert Dynamo.get_event(random_event.event_id) == {:ok, random_event} + :timer.sleep(@pause_interval) + check2 = Dynamo.get_event(random_event.event_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + + assert check1== {:error, "Event not found"} + assert check2 == {:ok, random_event} end test "get_events with entity_id" do diff --git a/test/perhap/event_test.exs b/test/perhap/event_test.exs index 7ef5284..fa5e182 100644 --- a/test/perhap/event_test.exs +++ b/test/perhap/event_test.exs @@ -1,7 +1,14 @@ defmodule Perhap.EventTestDynamo do - use ExUnit.Case, async: true + use ExUnit.Case, async: false import PerhapTest.Helper, only: :functions + @pause_interval 500 + + setup do + Perhap.Adapters.Eventstore.Dynamo.start_link([]) + :ok + end + test "timestamp returns system time in microseconds" do assert_in_delta(Perhap.Event.timestamp(), :erlang.system_time(:microsecond), 10) end @@ -66,11 +73,16 @@ defmodule Perhap.EventTestDynamo do test "saves and retrieves an event" do random_event = PerhapTest.Helper.make_random_event() - assert Perhap.Event.save_event(random_event) == {:ok, random_event} + + response = Perhap.Event.save_event(random_event) + :timer.sleep(@pause_interval) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + + assert response == {:ok, random_event} end test "doesn't retrieve an event that doesn't exist" do @@ -82,12 +94,15 @@ defmodule Perhap.EventTestDynamo do random_event = make_random_event( %Perhap.Event.Metadata{context: random_context} ) Perhap.Event.save_event(random_event) - {:ok, result} = Perhap.Event.retrieve_event(random_event.event_id) - assert result.event_id == random_event.event_id + :timer.sleep(@pause_interval) + result = Perhap.Event.retrieve_event(random_event.event_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + + assert result == {:ok, random_event} end test "retrieves events by context and entity ID" do @@ -99,14 +114,18 @@ defmodule Perhap.EventTestDynamo do rando2 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando2) - {:ok, results} = Perhap.Event.retrieve_events(random_context, entity_id: random_entity_id) - assert Enum.member?(results, rando1) - assert Enum.member?(results, rando2) + :timer.sleep(@pause_interval) + results = Perhap.Event.retrieve_events(random_context, entity_id: random_entity_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando1.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Events", %{event_id: rando2.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + + {:ok, events} = results + assert Enum.member?(events, rando1) + assert Enum.member?(events, rando2) end @@ -116,17 +135,22 @@ defmodule Perhap.EventTestDynamo do rando1 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando1) + :timer.sleep(@pause_interval) rando2 = make_random_event( %Perhap.Event.Metadata{context: random_context, entity_id: random_entity_id} ) Perhap.Event.save_event(rando2) - {:ok, results} = Perhap.Event.retrieve_events(random_context) - assert Enum.member?(results, rando1) - assert Enum.member?(results, rando2) + :timer.sleep(@pause_interval) + results = Perhap.Event.retrieve_events(random_context) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando1.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Events", %{event_id: rando2.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: random_context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + + {:ok, events} = results + assert Enum.member?(events, rando1) + assert Enum.member?(events, rando2) end test "returns an empty list if events don't exist" do @@ -135,15 +159,21 @@ defmodule Perhap.EventTestDynamo do rando = make_random_event( %Perhap.Event.Metadata{context: :z, entity_id: random_entity_id} ) Perhap.Event.save_event(rando) - assert Perhap.Event.retrieve_events(:z, entity_id: Perhap.Event.get_uuid_v4) == {:ok, []} - {:ok, [result1]} = Perhap.Event.retrieve_events(:z) - assert result1.event_id == rando.event_id - {:ok, [result2]} = Perhap.Event.retrieve_events(:z, entity_id: random_entity_id) - assert result2.event_id == rando.event_id + :timer.sleep(@pause_interval) + check1 = Perhap.Event.retrieve_events(:z, entity_id: Perhap.Event.get_uuid_v4) + result1 = Perhap.Event.retrieve_events(:z) + result2 = Perhap.Event.retrieve_events(:z, entity_id: random_entity_id) #cleanup ExAws.Dynamo.delete_item("Events", %{event_id: rando.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! ExAws.Dynamo.delete_item("Index", %{context: rando.metadata.context, entity_id: random_entity_id}) |> ExAws.request! + #/cleanup + {:ok, [event1]} = result1 + {:ok, [event2]} = result2 + + assert check1 == {:ok, []} + assert event1.event_id == rando.event_id + assert event2.event_id == rando.event_id end @@ -157,22 +187,62 @@ defmodule Perhap.EventTestDynamo do make_random_event(%Perhap.Event.Metadata{context: :aa, event_id: ev, entity_id: random_entity}) end) random_events |> Enum.each(fn(event) -> Perhap.Event.save_event(event) end) + :timer.sleep(@pause_interval) [ ev1 | [ ev2 | rest ] ] = random_events |> Enum.reverse - {:ok, results} = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2.event_id) + results = Perhap.Event.retrieve_events(:aa, entity_id: random_entity, after: ev2.event_id) #cleanup Enum.map(random_events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) ExAws.Dynamo.delete_item("Index", %{context: :aa, entity_id: random_entity}) |> ExAws.request! #/cleanup - #IO.inspect random_events - #IO.inspect results - assert Enum.member?(results, ev1) - Enum.map(rest, fn event -> refute Enum.member?(results, event) end) + {:ok, events} = results + assert Enum.member?(events, ev1) + Enum.map(rest, fn event -> refute Enum.member?(events, event) end) + end + + test "gets an event from the pending store" do + random_context = Enum.random([:k, :l, :m, :n, :o]) + random_event = make_random_event( + %Perhap.Event.Metadata{context: random_context} ) + Perhap.Event.save_event(random_event) + result = Perhap.Event.retrieve_event(random_event.event_id) + + :timer.sleep(@pause_interval) + #cleanup + ExAws.Dynamo.delete_item("Events", %{event_id: random_event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! + ExAws.Dynamo.delete_item("Index", %{context: random_event.metadata.context, entity_id: random_event.metadata.entity_id}) |> ExAws.request! + #/cleanup + + assert result == {:ok, random_event} + end + + test "can read and write more than 100 events" do + context = :test_more_than_100_events + random_entity = Perhap.Event.get_uuid_v4 + events = Enum.map(1..125, fn _number -> make_random_event( + %Perhap.Event.Metadata{context: context, entity_id: random_entity}) end) + Enum.each(events, fn event -> Perhap.Event.save_event(event) end) + :timer.sleep(4000) + + results = Perhap.Event.retrieve_events(context) + + #cleanup + Enum.map(events, fn event -> ExAws.Dynamo.delete_item("Events", %{event_id: event.event_id |> Perhap.Event.uuid_v1_to_time_order}) |> ExAws.request! end) + ExAws.Dynamo.delete_item("Index", %{context: context, entity_id: random_entity}) |> ExAws.request! + #/cleanup + + sorted_events = Enum.sort(events, fn event1, event2 -> event1.event_id >= event2.event_id end) + with {:ok, retrieved_events} <- results do + sorted_results = Enum.sort(retrieved_events, fn event1, event2 -> event1.event_id >= event2.event_id end) + + assert sorted_events == sorted_results + end end + @tag :pending test "retrieves events filtered by an event_type" do end