From 27632d964704af70ca87507e525f820db77f9339 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 6 Feb 2024 23:55:35 +1300 Subject: [PATCH 1/2] Restart actor thread if it is killed/dies (e.g. on fork). --- lib/async/actor/proxy.rb | 41 ++++++++++++++++++++++++++++------------ test/async/actor.rb | 10 ++++++++++ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/lib/async/actor/proxy.rb b/lib/async/actor/proxy.rb index 697950c..41fc575 100644 --- a/lib/async/actor/proxy.rb +++ b/lib/async/actor/proxy.rb @@ -25,15 +25,15 @@ def call(id) def initialize(target) @target = target - @queue = ::Thread::Queue.new - @thread = __start__ - - # Define a finalizer to ensure the thread is closed: - ::ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread)) + @queue = nil + @thread = nil + @guard = ::Thread::Mutex.new end # @parameter return_value [Symbol] One of :ignore, :promise or :wait. def method_missing(*arguments, return_value: :wait, **options, &block) + __start__ + unless return_value == :ignore result = Variable.new end @@ -49,18 +49,35 @@ def method_missing(*arguments, return_value: :wait, **options, &block) protected + def __kill__ + @queue&.close + @thread&.join + end + def __start__ - ::Thread.new do - ::Kernel.Sync do |task| - while operation = @queue.pop - task.async do - arguments, options, block, result = operation - Variable.fulfill(result) do - @target.public_send(*arguments, **options, &block) + return if @thread&.alive? + + @guard.synchronize do + return if @thread&.alive? + + @queue&.close + @queue = ::Thread::Queue.new + + @thread = ::Thread.new do + ::Kernel.Sync do |task| + while operation = @queue.pop + task.async do + arguments, options, block, result = operation + Variable.fulfill(result) do + @target.public_send(*arguments, **options, &block) + end end end end end + + # Define a finalizer to ensure the thread is closed: + ::ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread)) end end end diff --git a/test/async/actor.rb b/test/async/actor.rb index 73cd6b9..698f75e 100644 --- a/test/async/actor.rb +++ b/test/async/actor.rb @@ -6,4 +6,14 @@ expect(actor).to be_a(Array) end + + it "can restart actor if thread is killed" do + actor = Async::Actor.new(Array.new) + + expect(actor).to be_a(Array) + + actor.__send__(:__kill__) + + expect(actor).to be_a(Array) + end end From cc3f7bf3e45523ac2ca0194aa29f0cc4a61d8791 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Feb 2024 00:07:20 +1300 Subject: [PATCH 2/2] Use guard to protect `__kill__` too. --- lib/async/actor/proxy.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/async/actor/proxy.rb b/lib/async/actor/proxy.rb index 41fc575..c544da5 100644 --- a/lib/async/actor/proxy.rb +++ b/lib/async/actor/proxy.rb @@ -50,8 +50,13 @@ def method_missing(*arguments, return_value: :wait, **options, &block) protected def __kill__ - @queue&.close - @thread&.join + @guard.synchronize do + @queue&.close + @queue = nil + + @thread&.kill + @thread = nil + end end def __start__