diff --git a/lib/timeout.rb b/lib/timeout.rb index f173d02..36cd0f9 100644 --- a/lib/timeout.rb +++ b/lib/timeout.rb @@ -44,12 +44,92 @@ def self.handle_timeout(message) # :nodoc: end # :stopdoc: - CONDVAR = ConditionVariable.new - QUEUE = Queue.new - QUEUE_MUTEX = Mutex.new - TIMEOUT_THREAD_MUTEX = Mutex.new - @timeout_thread = nil - private_constant :CONDVAR, :QUEUE, :QUEUE_MUTEX, :TIMEOUT_THREAD_MUTEX + + # We keep a private reference so that time mocking libraries won't break Timeout. + GET_TIME = Process.method(:clock_gettime) + if defined?(Ractor.make_shareable) + # Ractor.make_shareable(Method) only works on Ruby 4+ + Ractor.make_shareable(GET_TIME) rescue nil + end + private_constant :GET_TIME + + class State + attr_reader :condvar, :queue, :queue_mutex # shared with Timeout.timeout() + + def initialize + @condvar = ConditionVariable.new + @queue = Queue.new + @queue_mutex = Mutex.new + + @timeout_thread = nil + @timeout_thread_mutex = Mutex.new + end + + if defined?(Ractor.store_if_absent) && defined?(Ractor.shareable?) && Ractor.shareable?(GET_TIME) + # Ractor support if + # 1. Ractor.store_if_absent is available + # 2. Method object can be shareable (4.0~) + def self.instance + Ractor.store_if_absent :timeout_gem_state do + State.new + end + end + else + GLOBAL_STATE = State.new + + def self.instance + GLOBAL_STATE + end + end + + def create_timeout_thread + watcher = Thread.new do + requests = [] + while true + until @queue.empty? and !requests.empty? # wait to have at least one request + req = @queue.pop + requests << req unless req.done? + end + closest_deadline = requests.min_by(&:deadline).deadline + + now = 0.0 + @queue_mutex.synchronize do + while (now = GET_TIME.call(Process::CLOCK_MONOTONIC)) < closest_deadline and @queue.empty? + @condvar.wait(@queue_mutex, closest_deadline - now) + end + end + + requests.each do |req| + req.interrupt if req.expired?(now) + end + requests.reject!(&:done?) + end + end + + if !watcher.group.enclosed? && (!defined?(Ractor.main?) || Ractor.main?) + ThreadGroup::Default.add(watcher) + end + + watcher.name = "Timeout stdlib thread" + watcher.thread_variable_set(:"\0__detached_thread__", true) + watcher + end + + def ensure_timeout_thread_created + unless @timeout_thread&.alive? + # If the Mutex is already owned we are in a signal handler. + # In that case, just return and let the main thread create the Timeout thread. + return if @timeout_thread_mutex.owned? + + @timeout_thread_mutex.synchronize do + unless @timeout_thread&.alive? + @timeout_thread = create_timeout_thread + end + end + end + end + end + private_constant :State class Request attr_reader :deadline @@ -91,55 +171,6 @@ def finished end private_constant :Request - def self.create_timeout_thread - watcher = Thread.new do - requests = [] - while true - until QUEUE.empty? and !requests.empty? # wait to have at least one request - req = QUEUE.pop - requests << req unless req.done? - end - closest_deadline = requests.min_by(&:deadline).deadline - - now = 0.0 - QUEUE_MUTEX.synchronize do - while (now = GET_TIME.call(Process::CLOCK_MONOTONIC)) < closest_deadline and QUEUE.empty? - CONDVAR.wait(QUEUE_MUTEX, closest_deadline - now) - end - end - - requests.each do |req| - req.interrupt if req.expired?(now) - end - requests.reject!(&:done?) - end - end - ThreadGroup::Default.add(watcher) unless watcher.group.enclosed? - watcher.name = "Timeout stdlib thread" - watcher.thread_variable_set(:"\0__detached_thread__", true) - watcher - end - private_class_method :create_timeout_thread - - def self.ensure_timeout_thread_created - unless @timeout_thread and @timeout_thread.alive? - # If the Mutex is already owned we are in a signal handler. - # In that case, just return and let the main thread create the @timeout_thread. - return if TIMEOUT_THREAD_MUTEX.owned? - TIMEOUT_THREAD_MUTEX.synchronize do - unless @timeout_thread and @timeout_thread.alive? - @timeout_thread = create_timeout_thread - end - end - end - end - private_class_method :ensure_timeout_thread_created - - # We keep a private reference so that time mocking libraries won't break - # Timeout. - GET_TIME = Process.method(:clock_gettime) - private_constant :GET_TIME - # :startdoc: # Perform an operation in a block, raising an error if it takes longer than @@ -178,12 +209,14 @@ def self.timeout(sec, klass = nil, message = nil, &block) #:yield: +sec+ return scheduler.timeout_after(sec, klass || Error, message, &block) end - ensure_timeout_thread_created + state = State.instance + state.ensure_timeout_thread_created + perform = Proc.new do |exc| request = Request.new(Thread.current, sec, exc, message) - QUEUE_MUTEX.synchronize do - QUEUE << request - CONDVAR.signal + state.queue_mutex.synchronize do + state.queue << request + state.condvar.signal end begin return yield(sec) diff --git a/test/test_timeout.rb b/test/test_timeout.rb index e367df7..3f94134 100644 --- a/test/test_timeout.rb +++ b/test/test_timeout.rb @@ -280,4 +280,24 @@ def test_handling_enclosed_threadgroup }.join end; end + + def test_ractor + assert_separately(%w[-rtimeout -W0], <<-'end;') + r = Ractor.new do + Timeout.timeout(1) { 42 } + end.value + + assert_equal 42, r + + r = Ractor.new do + begin + Timeout.timeout(0.1) { sleep } + rescue Timeout::Error + :ok + end + end.value + + assert_equal :ok, r + end; + end if defined?(::Ractor) && RUBY_VERSION >= '4.0' end