From 599fc94c1b6b8d281791e991df1d62a04bf60edd Mon Sep 17 00:00:00 2001 From: David Leach Date: Thu, 19 Nov 2015 21:23:49 +1300 Subject: [PATCH 1/7] Add ability to remove overflow workers after a delay When workers are expensive to start and transactions are quick killing all workers that are checked in when in overflow is very expensive. This change allows delaying the termination of overflow workers when there is peak load and alleviates worker churn. --- README.md | 39 ++++++++++++++--- src/poolboy.erl | 97 +++++++++++++++++++++++++++++++++++++----- test/poolboy_tests.erl | 37 ++++++++++++++++ 3 files changed, 155 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e603768..563c288 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,16 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished, the caller is gone, the call has exited or +the timeout set has been reached +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +159,29 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out ## Authors diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..fd7d982 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -32,11 +32,13 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), + workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer() }). -spec checkout(Pool :: pool()) -> pid(). @@ -126,7 +128,9 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + WorkersToReap = ets:new(workers_to_reap, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, + workers_to_reap = WorkersToReap}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -139,6 +143,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, Milliseconds} | Rest], WorkerArgs, State) when is_integer(Milliseconds) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = Milliseconds}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -182,8 +188,21 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, + max_overflow = MaxOverflow, + workers_to_reap = WorkersToReap} = State, case Workers of + [Pid | Left] when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + ok = case ets:lookup(WorkersToReap, Pid) of + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + true = ets:delete(State#state.workers_to_reap, Pid), + ok; + [] -> + ok + end, + {reply, Pid, State#state{workers = Left}}; [Pid | Left] -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -204,8 +223,9 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -235,7 +255,10 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, + monitors = Monitors, + workers_to_reap = WorkersToReap, + workers = Workers} = State, + true = ets:delete(WorkersToReap, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -243,15 +266,28 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:member(Pid, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + W = lists:filter(fun (P) -> P =/= Pid end, Workers), {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> {noreply, State} end end; - +handle_info({reap_worker, Pid}, State)-> + #state{monitors = Monitors, + workers_to_reap = WorkersToReap} = State, + true = ets:delete(WorkersToReap, Pid), + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = purge_worker(Pid, State), + {noreply, NewState}; + [] -> + NewState = purge_worker(Pid, State), + {noreply, NewState} + end; handle_info(_Info, State) -> {noreply, State}. @@ -285,6 +321,19 @@ dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +purge_worker(Pid, State) -> + #state{supervisor = Sup, + workers = Workers, + overflow = Overflow} = State, + case Overflow > 0 of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, Workers), + ok = dismiss_worker(Sup, Pid), + State#state{workers = W, overflow = State#state.overflow -1}; + false -> + State + end. + prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -300,12 +349,32 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl, + workers_to_reap = WorkersToReap} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + case ets:lookup(WorkersToReap, Pid) of + [] -> + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}), + NewOverflow = Overflow; + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + NewOverflow = Overflow -1, + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}) + end, + true = ets:insert(WorkersToReap, {Pid, Tref}), + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty, overflow = NewOverflow}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; @@ -343,7 +412,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..b41c6f8 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,9 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -386,6 +389,34 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + poolboy:checkin(Pid, Worker), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + Worker2 = poolboy:checkout(Pid), + % Ensure checked in worker is in fact being reused + ?assertEqual(Worker, Worker2), + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + Worker3 = poolboy:checkout(Pid), + ?assertEqual(Worker1, Worker3), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(550), + % Test overflow worker is reaped in the correct time period + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -546,5 +577,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From 7190178be986250bd1230dae1adbc7af55ec859c Mon Sep 17 00:00:00 2001 From: David Leach Date: Thu, 19 Nov 2015 21:23:49 +1300 Subject: [PATCH 2/7] Add ability to remove overflow workers after a delay When workers are expensive to start and transactions are quick killing all workers that are checked in when in overflow is very expensive. This change allows delaying the termination of overflow workers when there is peak load and alleviates worker churn. --- README.md | 38 ++++++++++++++--- src/poolboy.erl | 97 +++++++++++++++++++++++++++++++++++++----- test/poolboy_tests.erl | 37 ++++++++++++++++ 3 files changed, 154 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e603768..2e5388e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,15 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished. +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +158,29 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out ## Authors diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..fd7d982 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -32,11 +32,13 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), + workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer() }). -spec checkout(Pool :: pool()) -> pid(). @@ -126,7 +128,9 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + WorkersToReap = ets:new(workers_to_reap, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, + workers_to_reap = WorkersToReap}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -139,6 +143,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, Milliseconds} | Rest], WorkerArgs, State) when is_integer(Milliseconds) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = Milliseconds}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -182,8 +188,21 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, + max_overflow = MaxOverflow, + workers_to_reap = WorkersToReap} = State, case Workers of + [Pid | Left] when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + ok = case ets:lookup(WorkersToReap, Pid) of + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + true = ets:delete(State#state.workers_to_reap, Pid), + ok; + [] -> + ok + end, + {reply, Pid, State#state{workers = Left}}; [Pid | Left] -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -204,8 +223,9 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -235,7 +255,10 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, + monitors = Monitors, + workers_to_reap = WorkersToReap, + workers = Workers} = State, + true = ets:delete(WorkersToReap, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -243,15 +266,28 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:member(Pid, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + W = lists:filter(fun (P) -> P =/= Pid end, Workers), {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> {noreply, State} end end; - +handle_info({reap_worker, Pid}, State)-> + #state{monitors = Monitors, + workers_to_reap = WorkersToReap} = State, + true = ets:delete(WorkersToReap, Pid), + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = purge_worker(Pid, State), + {noreply, NewState}; + [] -> + NewState = purge_worker(Pid, State), + {noreply, NewState} + end; handle_info(_Info, State) -> {noreply, State}. @@ -285,6 +321,19 @@ dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +purge_worker(Pid, State) -> + #state{supervisor = Sup, + workers = Workers, + overflow = Overflow} = State, + case Overflow > 0 of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, Workers), + ok = dismiss_worker(Sup, Pid), + State#state{workers = W, overflow = State#state.overflow -1}; + false -> + State + end. + prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -300,12 +349,32 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl, + workers_to_reap = WorkersToReap} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + case ets:lookup(WorkersToReap, Pid) of + [] -> + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}), + NewOverflow = Overflow; + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + NewOverflow = Overflow -1, + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}) + end, + true = ets:insert(WorkersToReap, {Pid, Tref}), + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty, overflow = NewOverflow}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; @@ -343,7 +412,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..b41c6f8 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,9 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -386,6 +389,34 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + poolboy:checkin(Pid, Worker), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + Worker2 = poolboy:checkout(Pid), + % Ensure checked in worker is in fact being reused + ?assertEqual(Worker, Worker2), + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + Worker3 = poolboy:checkout(Pid), + ?assertEqual(Worker1, Worker3), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(550), + % Test overflow worker is reaped in the correct time period + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -546,5 +577,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From eb35c27e1262b1d5615c2a7d74916478dfdaee34 Mon Sep 17 00:00:00 2001 From: David Leach Date: Mon, 18 Jan 2016 22:27:15 +1300 Subject: [PATCH 3/7] Removing debug config --- src/poolboy.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index fe6f427..39749c7 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,5 +1,4 @@ %% Poolboy - A hunky Erlang worker pool factory --include_lib("eunit/include/eunit.hrl"). -module(poolboy). -behaviour(gen_server). @@ -248,10 +247,8 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors, - workers_to_reap = WorkersToReap, workers = Workers} = State, - - true = ets:delete(WorkersToReap, Pid), + ok = cancel_worker_reap(State, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), From a99befebfacc4bf3ad13503779d5fbe141d3a25d Mon Sep 17 00:00:00 2001 From: walrusVision Date: Fri, 15 Sep 2017 16:17:00 +1200 Subject: [PATCH 4/7] Added function to return richer pool status info --- README.md | 18 ++++++++++++++++++ src/poolboy.erl | 29 ++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2e5388e..09fa1f4 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,24 @@ Returns : {Status, Workers, Overflow, InUse} specified by MaxOverflow when starting pool - `InUse`: Number of workers currently busy/checked out +## Full Pool Status +Returns a propslist of counters relating to a specified pool. Useful +for graphing the state of your pools +- `size`: The defined size of the permanent worker pool +- `max_overflow`: The maximum number of overflow workers allowed +- `total_worker_count`: The total supervised workers. This includes + any workers waiting to be culled and not available to the + general pool +- `ready_worker_count`: The count of workers available workers to be + used including overflow workers. Workers in this count may or may + not be checked out. +- `checked_out_worker_count`: The count of workers that are currently + checked out +- `overflow_worker_count`: The count of active overflow workers +- `waiting_request_count`: The backlog of requests waiting to checkout + a worker + + ## Authors - Devin Torres (devinus) diff --git a/src/poolboy.erl b/src/poolboy.erl index 39749c7..ea76ab4 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -4,7 +4,7 @@ -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, start/1, start/2, - start_link/1, start_link/2, stop/1, status/1]). + start_link/1, start_link/2, stop/1, status/1, full_status/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). @@ -123,6 +123,10 @@ stop(Pool) -> status(Pool) -> gen_server:call(Pool, status). +-spec full_status(Pool :: pool()) -> proplists:proplist(). +full_status(Pool) -> + gen_server:call(Pool, full_status). + init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), @@ -217,6 +221,29 @@ handle_call(status, _From, State) -> CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; +handle_call(full_status, _From, State) -> + #state{workers = Workers, + size = Size, + monitors = Monitors, + overflow = Overflow, + max_overflow = MaxOverflow, + supervisor = Sup, + waiting = Waiting } = State, + CheckedOutWorkers = ets:info(Monitors, size), + {reply, + { + {size, Size}, % The permanent worker size + {max_overflow, MaxOverflow}, % The overflow size + % The maximum amount of worker is size + overflow_size + + {total_worker_count, length(supervisor:which_children(Sup))}, % The total of all workers + {ready_worker_count, length(Workers)}, % Number of workers ready to use + {overflow_worker_count, Overflow}, % Number of overflow workers + {checked_out_worker_count, CheckedOutWorkers}, % Number of workers currently checked out + {waiting_request_count, queue:len(Waiting)} % Number of waiting requests + }, + State + }; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; From 0b5a81000da4503b2652f689a3474dc62eea50c9 Mon Sep 17 00:00:00 2001 From: walrusVision Date: Mon, 18 Sep 2017 09:43:40 +1200 Subject: [PATCH 5/7] Added tests for full_status --- test/poolboy_tests.erl | 72 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 6314472..0cea437 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -54,6 +54,9 @@ pool_test_() -> {<<"Pool returns status">>, fun pool_returns_status/0 }, + {<<"Pool returns full status">>, + fun pool_returns_full_status/0 + }, {<<"Pool demonitors previously waiting processes">>, fun demonitors_previously_waiting_processes/0 }, @@ -503,6 +506,63 @@ pool_returns_status() -> ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)), ok = pool_call(Pool4, stop). +pool_returns_full_status() -> + {ok, Pool} = new_pool(2, 0), + ?assertEqual(full_status(2,0,2,2,0,0,0), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,1,0,1,0), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,0,0,2,0), poolboy:full_status(Pool)), + ok = pool_call(Pool, stop), + + {ok, Pool2} = new_pool(1, 1), + ?assertEqual(full_status(1,1,1,1,0,0,0), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,1,0,0,1,0), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,2,0,1,2,0), poolboy:full_status(Pool2)), + ok = pool_call(Pool2, stop), + + {ok, Pool3} = new_pool(0, 2), + ?assertEqual(full_status(0,2,0,0,0,0,0), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,1,0,1,1,0), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,2,0,2,2,0), poolboy:full_status(Pool3)), + ok = pool_call(Pool3, stop), + + {ok, Pool4} = new_pool(0, 0), + ?assertEqual(full_status(0,0,0,0,0,0,0), poolboy:full_status(Pool4)), + ok = pool_call(Pool4, stop), + + % Check that the wait queue is showing correct amount + {ok, Pool5} = new_pool(1, 0), + Checkout1 = poolboy:checkout(Pool5), + Self = self(), + spawn(fun() -> + Worker = poolboy:checkout(Pool5), + Self ! got_worker, + checkin_worker(Pool5, Worker) + end), + + %% Spawned process should block waiting for worker to be available. + receive + got_worker -> ?assert(false) + after + 500 -> ?assert(true) + end, + ?assertEqual(full_status(1,0,1,0,0,1,1), poolboy:full_status(Pool5)), + checkin_worker(Pool5, Checkout1), + + %% Spawned process should have been able to obtain a worker. + receive + got_worker -> ?assert(true) + after + 500 -> ?assert(false) + end, + ?assertEqual(full_status(1,0,1,1,0,0,0), poolboy:full_status(Pool5)), + ok = pool_call(Pool5, stop). + demonitors_previously_waiting_processes() -> {ok, Pool} = new_pool(1,0), Self = self(), @@ -611,3 +671,15 @@ new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). + +full_status(Size, MaxOverFlow, TotalWorker, ReadyWorker, OverflowWorker, + CheckedOutWorker, Waiting) -> + % Helper function to populate the results tuple + {{size, Size}, + {max_overflow, MaxOverFlow}, + {total_worker_count,TotalWorker}, + {ready_worker_count, ReadyWorker}, + {overflow_worker_count, OverflowWorker}, + {checked_out_worker_count, CheckedOutWorker}, + {waiting_request_count, Waiting} + }. From cb3f9cf4b310c20811f87fcfc6f6492fe465ed24 Mon Sep 17 00:00:00 2001 From: walrusVision Date: Wed, 20 Sep 2017 14:05:40 +1200 Subject: [PATCH 6/7] Actually return proplists for status --- src/poolboy.erl | 4 ++-- test/poolboy_tests.erl | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index ea76ab4..030fca6 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -231,7 +231,7 @@ handle_call(full_status, _From, State) -> waiting = Waiting } = State, CheckedOutWorkers = ets:info(Monitors, size), {reply, - { + [ {size, Size}, % The permanent worker size {max_overflow, MaxOverflow}, % The overflow size % The maximum amount of worker is size + overflow_size @@ -241,7 +241,7 @@ handle_call(full_status, _From, State) -> {overflow_worker_count, Overflow}, % Number of overflow workers {checked_out_worker_count, CheckedOutWorkers}, % Number of workers currently checked out {waiting_request_count, queue:len(Waiting)} % Number of waiting requests - }, + ], State }; handle_call(get_avail_workers, _From, State) -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 0cea437..ee1d17a 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -675,11 +675,11 @@ pool_call(ServerRef, Request) -> full_status(Size, MaxOverFlow, TotalWorker, ReadyWorker, OverflowWorker, CheckedOutWorker, Waiting) -> % Helper function to populate the results tuple - {{size, Size}, + [{size, Size}, {max_overflow, MaxOverFlow}, {total_worker_count,TotalWorker}, {ready_worker_count, ReadyWorker}, {overflow_worker_count, OverflowWorker}, {checked_out_worker_count, CheckedOutWorker}, {waiting_request_count, Waiting} - }. + ]. From 2334498bcc3f5c55f687885d764f8ade328cbbe1 Mon Sep 17 00:00:00 2001 From: Glen Walker Date: Fri, 1 Dec 2017 14:14:32 +1300 Subject: [PATCH 7/7] Don't reap workers that are checked out again * Ensure reap message already in mailbox is flushed when cancelling reap * Reap shouldn't be touching monitors, they are just for owner of checked out workers --- src/poolboy.erl | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 030fca6..4cf0b9c 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -292,19 +292,10 @@ handle_info({'EXIT', Pid, _Reason}, State) -> end end; handle_info({reap_worker, Pid}, State)-> - #state{monitors = Monitors, - workers_to_reap = WorkersToReap} = State, + #state{workers_to_reap = WorkersToReap} = State, true = ets:delete(WorkersToReap, Pid), - case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> - true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), - NewState = purge_worker(Pid, State), - {noreply, NewState}; - [] -> - NewState = purge_worker(Pid, State), - {noreply, NewState} - end; + NewState = purge_worker(Pid, State), + {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. @@ -346,6 +337,12 @@ cancel_worker_reap(State, Pid) -> ok; [] -> ok + end, + receive + {reap_worker, Pid} -> + ok + after 0 -> + ok end. purge_worker(Pid, State) ->