diff --git a/README.md b/README.md index e603768..09fa1f4 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,15 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished. +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +158,47 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out + +## Full Pool Status +Returns a propslist of counters relating to a specified pool. Useful +for graphing the state of your pools +- `size`: The defined size of the permanent worker pool +- `max_overflow`: The maximum number of overflow workers allowed +- `total_worker_count`: The total supervised workers. This includes + any workers waiting to be culled and not available to the + general pool +- `ready_worker_count`: The count of workers available workers to be + used including overflow workers. Workers in this count may or may + not be checked out. +- `checked_out_worker_count`: The count of workers that are currently + checked out +- `overflow_worker_count`: The count of active overflow workers +- `waiting_request_count`: The backlog of requests waiting to checkout + a worker + ## Authors diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..4cf0b9c 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,11 +1,10 @@ %% Poolboy - A hunky Erlang worker pool factory - -module(poolboy). -behaviour(gen_server). -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, start/1, start/2, - start_link/1, start_link/2, stop/1, status/1]). + start_link/1, start_link/2, stop/1, status/1, full_status/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). @@ -32,11 +31,13 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), + workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer() }). -spec checkout(Pool :: pool()) -> pid(). @@ -122,11 +123,17 @@ stop(Pool) -> status(Pool) -> gen_server:call(Pool, status). +-spec full_status(Pool :: pool()) -> proplists:proplist(). +full_status(Pool) -> + gen_server:call(Pool, full_status). + init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + WorkersToReap = ets:new(workers_to_reap, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, + workers_to_reap = WorkersToReap}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -139,6 +146,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -184,6 +193,11 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> overflow = Overflow, max_overflow = MaxOverflow} = State, case Workers of + [Pid | Left] when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + ok = cancel_worker_reap(State, Pid), + {reply, Pid, State#state{workers = Left}}; [Pid | Left] -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -204,8 +218,32 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; +handle_call(full_status, _From, State) -> + #state{workers = Workers, + size = Size, + monitors = Monitors, + overflow = Overflow, + max_overflow = MaxOverflow, + supervisor = Sup, + waiting = Waiting } = State, + CheckedOutWorkers = ets:info(Monitors, size), + {reply, + [ + {size, Size}, % The permanent worker size + {max_overflow, MaxOverflow}, % The overflow size + % The maximum amount of worker is size + overflow_size + + {total_worker_count, length(supervisor:which_children(Sup))}, % The total of all workers + {ready_worker_count, length(Workers)}, % Number of workers ready to use + {overflow_worker_count, Overflow}, % Number of overflow workers + {checked_out_worker_count, CheckedOutWorkers}, % Number of workers currently checked out + {waiting_request_count, queue:len(Waiting)} % Number of waiting requests + ], + State + }; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -235,7 +273,9 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, + monitors = Monitors, + workers = Workers} = State, + ok = cancel_worker_reap(State, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -243,15 +283,19 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:member(Pid, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + W = lists:filter(fun (P) -> P =/= Pid end, Workers), {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> {noreply, State} end end; - +handle_info({reap_worker, Pid}, State)-> + #state{workers_to_reap = WorkersToReap} = State, + true = ets:delete(WorkersToReap, Pid), + NewState = purge_worker(Pid, State), + {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. @@ -285,6 +329,35 @@ dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +cancel_worker_reap(State, Pid) -> + case ets:lookup(State#state.workers_to_reap, Pid) of + [{Pid, Tref}] -> + erlang:cancel_timer(Tref), + true = ets:delete(State#state.workers_to_reap, Pid), + ok; + [] -> + ok + end, + receive + {reap_worker, Pid} -> + ok + after 0 -> + ok + end. + +purge_worker(Pid, State) -> + #state{supervisor = Sup, + workers = Workers, + overflow = Overflow} = State, + case Overflow > 0 of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, Workers), + ok = dismiss_worker(Sup, Pid), + State#state{workers = W, overflow = Overflow -1}; + false -> + State + end. + prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -300,12 +373,23 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl, + workers_to_reap = WorkersToReap} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + Tref = + erlang:send_after(OverflowTtl, self(), {reap_worker, Pid}), + true = ets:insert(WorkersToReap, {Pid, Tref}), + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; @@ -343,7 +427,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..ee1d17a 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,9 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -51,6 +54,9 @@ pool_test_() -> {<<"Pool returns status">>, fun pool_returns_status/0 }, + {<<"Pool returns full status">>, + fun pool_returns_full_status/0 + }, {<<"Pool demonitors previously waiting processes">>, fun demonitors_previously_waiting_processes/0 }, @@ -386,6 +392,60 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + % Test first worker is returned to list of available workers + poolboy:checkin(Pid, Worker), + timer:sleep(500), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + % Ensure first worker is in fact being reused + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(Worker, Worker2), + % Test second worker is returned to list of available workers + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + % Ensure second worker is in fact being reused + Worker3 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(Worker1, Worker3), + % Test we've got two workers ready when two are checked in in quick + % succession + poolboy:checkin(Pid, Worker2), + timer:sleep(100), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker3), + timer:sleep(100), + ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), + % Test an owner death + spawn(fun() -> + poolboy:checkout(Pid), + receive after 100 -> exit(normal) end + end), + ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + % Test overflow worker is reaped in the correct time period + timer:sleep(850), + % Test overflow worker is reaped in the correct time period + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + % Test worker death behaviour + Worker4 = poolboy:checkout(Pid), + Worker5 = poolboy:checkout(Pid), + exit(Worker5, kill), + timer:sleep(100), + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + exit(Worker4, kill), + timer:sleep(100), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -446,6 +506,63 @@ pool_returns_status() -> ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)), ok = pool_call(Pool4, stop). +pool_returns_full_status() -> + {ok, Pool} = new_pool(2, 0), + ?assertEqual(full_status(2,0,2,2,0,0,0), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,1,0,1,0), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,0,0,2,0), poolboy:full_status(Pool)), + ok = pool_call(Pool, stop), + + {ok, Pool2} = new_pool(1, 1), + ?assertEqual(full_status(1,1,1,1,0,0,0), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,1,0,0,1,0), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,2,0,1,2,0), poolboy:full_status(Pool2)), + ok = pool_call(Pool2, stop), + + {ok, Pool3} = new_pool(0, 2), + ?assertEqual(full_status(0,2,0,0,0,0,0), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,1,0,1,1,0), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,2,0,2,2,0), poolboy:full_status(Pool3)), + ok = pool_call(Pool3, stop), + + {ok, Pool4} = new_pool(0, 0), + ?assertEqual(full_status(0,0,0,0,0,0,0), poolboy:full_status(Pool4)), + ok = pool_call(Pool4, stop), + + % Check that the wait queue is showing correct amount + {ok, Pool5} = new_pool(1, 0), + Checkout1 = poolboy:checkout(Pool5), + Self = self(), + spawn(fun() -> + Worker = poolboy:checkout(Pool5), + Self ! got_worker, + checkin_worker(Pool5, Worker) + end), + + %% Spawned process should block waiting for worker to be available. + receive + got_worker -> ?assert(false) + after + 500 -> ?assert(true) + end, + ?assertEqual(full_status(1,0,1,0,0,1,1), poolboy:full_status(Pool5)), + checkin_worker(Pool5, Checkout1), + + %% Spawned process should have been able to obtain a worker. + receive + got_worker -> ?assert(true) + after + 500 -> ?assert(false) + end, + ?assertEqual(full_status(1,0,1,1,0,0,0), poolboy:full_status(Pool5)), + ok = pool_call(Pool5, stop). + demonitors_previously_waiting_processes() -> {ok, Pool} = new_pool(1,0), Self = self(), @@ -516,7 +633,7 @@ reuses_waiting_monitor_on_worker_exit() -> receive ok -> ok end end), - Worker = receive {worker, Worker} -> Worker end, + Worker = receive {worker, W} -> W end, Ref = monitor(process, Worker), exit(Worker, kill), receive @@ -546,5 +663,23 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). + +full_status(Size, MaxOverFlow, TotalWorker, ReadyWorker, OverflowWorker, + CheckedOutWorker, Waiting) -> + % Helper function to populate the results tuple + [{size, Size}, + {max_overflow, MaxOverFlow}, + {total_worker_count,TotalWorker}, + {ready_worker_count, ReadyWorker}, + {overflow_worker_count, OverflowWorker}, + {checked_out_worker_count, CheckedOutWorker}, + {waiting_request_count, Waiting} + ].