From f06eb3f86046b9ccf22648a3e1e51f8ff33dfe6e Mon Sep 17 00:00:00 2001 From: Gordon Woolbert Date: Fri, 17 Nov 2023 13:08:18 -0500 Subject: [PATCH 1/2] Update batch insert to return count --- lib/buffer.ex | 5 +++-- lib/buffer/server.ex | 29 +++++++++++++++-------------- test/buffer_test.exs | 8 ++++---- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/lib/buffer.ex b/lib/buffer.ex index a08c5c0..a7d5cc6 100644 --- a/lib/buffer.ex +++ b/lib/buffer.ex @@ -170,10 +170,11 @@ defmodule Buffer do regardless of flush conditions being met. Afterwards, if a limit has been exceeded, the buffer will be flushed async. """ - @spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: :ok | {:error, atom()} + @spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: + {:ok | non_neg_integer()} | {:error, atom()} def insert_batch(buffer, items, opts \\ []) do with {:ok, {partitioner, _}} <- fetch_buffer(buffer) do - do_insert_batch(buffer, partitioner, items, opts) + {:ok, do_insert_batch(buffer, partitioner, items, opts)} end end diff --git a/lib/buffer/server.ex b/lib/buffer/server.ex index bf2aff8..222e10f 100644 --- a/lib/buffer/server.ex +++ b/lib/buffer/server.ex @@ -98,13 +98,14 @@ defmodule Buffer.Server do end def handle_call({:safe_insert_batch, items}, _from, buffer) do - {:reply, :ok, do_safe_insert_batch(buffer, items)} + {buffer, count} = do_safe_insert_batch(buffer, items) + {:reply, count, buffer} end def handle_call({:unsafe_insert_batch, items}, _from, buffer) do case do_unsafe_insert_batch(buffer, items) do - {:flush, buffer} -> {:reply, :ok, buffer, {:continue, :flush}} - {:cont, buffer} -> {:reply, :ok, buffer} + {{:flush, buffer}, count} -> {:reply, count, buffer, {:continue, :flush}} + {{:cont, buffer}, count} -> {:reply, count, buffer} end end @@ -163,22 +164,22 @@ defmodule Buffer.Server do end defp do_safe_insert_batch(buffer, items) do - Enum.reduce(items, buffer, fn item, acc -> - case State.insert(acc, item) do - {:flush, acc} -> - do_flush(acc) - refresh(acc) - - {:cont, acc} -> - acc + Enum.reduce(items, {buffer, 0}, fn item, {buffer, count} -> + case State.insert(buffer, item) do + {:flush, buffer} -> + do_flush(buffer) + {refresh(buffer), count + 1} + + {:cont, buffer} -> + {buffer, count + 1} end end) end defp do_unsafe_insert_batch(buffer, items) do - Enum.reduce(items, {:cont, buffer}, fn item, acc -> - {_, buffer} = acc - State.insert(buffer, item) + Enum.reduce(items, {{:cont, buffer}, 0}, fn item, acc -> + {{_, buffer}, count} = acc + {State.insert(buffer, item), count + 1} end) end diff --git a/test/buffer_test.exs b/test/buffer_test.exs index 28ccd03..162f197 100644 --- a/test/buffer_test.exs +++ b/test/buffer_test.exs @@ -379,7 +379,7 @@ defmodule BufferTest do items = ["foo", "bar", "baz"] assert {:ok, buffer} = start_ex_buffer() - assert Buffer.insert_batch(buffer, items) == :ok + assert Buffer.insert_batch(buffer, items) == {:ok, 3} assert Buffer.dump(buffer) == {:ok, ["foo", "bar", "baz"]} end @@ -388,7 +388,7 @@ defmodule BufferTest do items = ["foo", "bar", "baz"] assert {:ok, buffer} = start_ex_buffer(opts) - assert Buffer.insert_batch(buffer, items) == :ok + assert Buffer.insert_batch(buffer, items) == {:ok, 3} assert Buffer.dump(buffer, partition: 0) == {:ok, ["foo", "bar", "baz"]} end @@ -397,7 +397,7 @@ defmodule BufferTest do items = ["foo", "bar", "baz"] assert {:ok, buffer} = start_ex_buffer(opts) - assert Buffer.insert_batch(buffer, items) == :ok + assert Buffer.insert_batch(buffer, items) == {:ok, 3} assert_receive {^buffer, ["foo", "bar"], _} assert Buffer.dump(buffer) == {:ok, ["baz"]} end @@ -407,7 +407,7 @@ defmodule BufferTest do items = ["foo", "bar", "baz"] assert {:ok, buffer} = start_ex_buffer(opts) - assert Buffer.insert_batch(buffer, items, safe_flush: false) == :ok + assert Buffer.insert_batch(buffer, items, safe_flush: false) == {:ok, 3} assert_receive {^buffer, ["foo", "bar", "baz"], _} end end From b32f103b49547c8852cf945d117bbe6c8ab9a933 Mon Sep 17 00:00:00 2001 From: Gordon Woolbert Date: Fri, 17 Nov 2023 13:11:46 -0500 Subject: [PATCH 2/2] update doc --- lib/buffer.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/buffer.ex b/lib/buffer.ex index a7d5cc6..c54d48b 100644 --- a/lib/buffer.ex +++ b/lib/buffer.ex @@ -161,7 +161,7 @@ defmodule Buffer do end @doc """ - Inserts a batch of items into the given `Buffer`. + Inserts a batch of items into the given `Buffer` and returns the number of items inserted. ## Options