Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ defmodule Buffer do
end

@doc """
Inserts a batch of items into the given `Buffer`.
Inserts a batch of items into the given `Buffer` and returns the number of items inserted.

## Options

Expand All @@ -170,10 +170,11 @@ defmodule Buffer do
regardless of flush conditions being met. Afterwards, if a limit has been exceeded,
the buffer will be flushed async.
"""
@spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: :ok | {:error, atom()}
@spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) ::
{:ok | non_neg_integer()} | {:error, atom()}
def insert_batch(buffer, items, opts \\ []) do
with {:ok, {partitioner, _}} <- fetch_buffer(buffer) do
do_insert_batch(buffer, partitioner, items, opts)
{:ok, do_insert_batch(buffer, partitioner, items, opts)}
end
end

Expand Down
29 changes: 15 additions & 14 deletions lib/buffer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ defmodule Buffer.Server do
end

def handle_call({:safe_insert_batch, items}, _from, buffer) do
{:reply, :ok, do_safe_insert_batch(buffer, items)}
{buffer, count} = do_safe_insert_batch(buffer, items)
{:reply, count, buffer}
end

def handle_call({:unsafe_insert_batch, items}, _from, buffer) do
case do_unsafe_insert_batch(buffer, items) do
{:flush, buffer} -> {:reply, :ok, buffer, {:continue, :flush}}
{:cont, buffer} -> {:reply, :ok, buffer}
{{:flush, buffer}, count} -> {:reply, count, buffer, {:continue, :flush}}
{{:cont, buffer}, count} -> {:reply, count, buffer}
end
end

Expand Down Expand Up @@ -163,22 +164,22 @@ defmodule Buffer.Server do
end

defp do_safe_insert_batch(buffer, items) do
Enum.reduce(items, buffer, fn item, acc ->
case State.insert(acc, item) do
{:flush, acc} ->
do_flush(acc)
refresh(acc)

{:cont, acc} ->
acc
Enum.reduce(items, {buffer, 0}, fn item, {buffer, count} ->
case State.insert(buffer, item) do
{:flush, buffer} ->
do_flush(buffer)
{refresh(buffer), count + 1}

{:cont, buffer} ->
{buffer, count + 1}
end
end)
end

defp do_unsafe_insert_batch(buffer, items) do
Enum.reduce(items, {:cont, buffer}, fn item, acc ->
{_, buffer} = acc
State.insert(buffer, item)
Enum.reduce(items, {{:cont, buffer}, 0}, fn item, acc ->
{{_, buffer}, count} = acc
{State.insert(buffer, item), count + 1}
end)
end

Expand Down
8 changes: 4 additions & 4 deletions test/buffer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ defmodule BufferTest do
items = ["foo", "bar", "baz"]

assert {:ok, buffer} = start_ex_buffer()
assert Buffer.insert_batch(buffer, items) == :ok
assert Buffer.insert_batch(buffer, items) == {:ok, 3}
assert Buffer.dump(buffer) == {:ok, ["foo", "bar", "baz"]}
end

Expand All @@ -388,7 +388,7 @@ defmodule BufferTest do
items = ["foo", "bar", "baz"]

assert {:ok, buffer} = start_ex_buffer(opts)
assert Buffer.insert_batch(buffer, items) == :ok
assert Buffer.insert_batch(buffer, items) == {:ok, 3}
assert Buffer.dump(buffer, partition: 0) == {:ok, ["foo", "bar", "baz"]}
end

Expand All @@ -397,7 +397,7 @@ defmodule BufferTest do
items = ["foo", "bar", "baz"]

assert {:ok, buffer} = start_ex_buffer(opts)
assert Buffer.insert_batch(buffer, items) == :ok
assert Buffer.insert_batch(buffer, items) == {:ok, 3}
assert_receive {^buffer, ["foo", "bar"], _}
assert Buffer.dump(buffer) == {:ok, ["baz"]}
end
Expand All @@ -407,7 +407,7 @@ defmodule BufferTest do
items = ["foo", "bar", "baz"]

assert {:ok, buffer} = start_ex_buffer(opts)
assert Buffer.insert_batch(buffer, items, safe_flush: false) == :ok
assert Buffer.insert_batch(buffer, items, safe_flush: false) == {:ok, 3}
assert_receive {^buffer, ["foo", "bar", "baz"], _}
end
end
Expand Down