From 428e8df73849ef93b50ed4d39c45c2b92e7b41ff Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 17 Feb 2021 11:47:53 +0000 Subject: [PATCH 01/13] Add more details to logs to improve debugging. --- lib/temporal/activity/context.rb | 6 ++++- lib/temporal/activity/task_processor.rb | 25 ++++++++++--------- .../temporal/activity/task_processor_spec.rb | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/lib/temporal/activity/context.rb b/lib/temporal/activity/context.rb index 64c28495..cecf1ed5 100644 --- a/lib/temporal/activity/context.rb +++ b/lib/temporal/activity/context.rb @@ -31,7 +31,7 @@ def async_token end def heartbeat(details = nil) - logger.debug('Activity heartbeat') + logger.debug("Activity heartbeat #{log_tag}") client.record_activity_task_heartbeat(task_token: task_token, details: details) end @@ -56,6 +56,10 @@ def headers metadata.headers end + def log_tag + "#{metadata.namespace} / #{metadata.workflow_name}[#{metadata.workflow_id}:#{metadata.workflow_run_id}] / #{metadata.name}[#{metadata.id}]##{metadata.attempt}" + end + private attr_reader :client, :metadata diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 3bbb9497..a5c18088 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -19,28 +19,29 @@ def initialize(task, namespace, activity_lookup, client, middleware_chain) def process start_time = Time.now - Temporal.logger.info("Processing activity task for #{activity_name}") Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name) + metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) + context = Activity::Context.new(client, metadata) + if !activity_class raise ActivityNotRegistered, 'Activity is not registered with this worker' end - metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) - context = Activity::Context.new(client, metadata) + Temporal.logger.info("Activity task processing (#{context.log_tag})") result = middleware_chain.invoke(metadata) do activity_class.execute_in_context(context, parse_payload(task.input)) end # Do not complete asynchronous activities, these should be completed manually - respond_completed(result) unless context.async? + respond_completed(result, context) unless context.async? rescue StandardError, ScriptError => error - respond_failed(error) + respond_failed(error, context) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name) - Temporal.logger.debug("Activity task processed in #{time_diff_ms}ms") + Temporal.logger.debug("Activity task processed in #{time_diff_ms}ms (#{context.log_tag})") end private @@ -53,18 +54,18 @@ def queue_time_ms ((started - scheduled) * 1_000).round end - def respond_completed(result) - Temporal.logger.info("Activity #{activity_name} completed") + def respond_completed(result, context) + Temporal.logger.info("Activity task completed (#{context.log_tag})") client.respond_activity_task_completed(task_token: task_token, result: result) rescue StandardError => error - Temporal.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}") + Temporal.logger.error("Activity task completion not recorded: #{error.inspect} (#{context.log_tag})") end - def respond_failed(error) - Temporal.logger.error("Activity #{activity_name} failed with: #{error.inspect}") + def respond_failed(error, context) + Temporal.logger.error("Activity task failed: #{error.inspect} (#{context.log_tag})") client.respond_activity_task_failed(task_token: task_token, exception: error) rescue StandardError => error - Temporal.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}") + Temporal.logger.error("Activity task failure not recorded: #{error.inspect} (#{context.log_tag})") end def parse_payload(payload) diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 58ac9cdb..2a1a9a41 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -20,7 +20,7 @@ let(:input) { ['arg1', 'arg2'] } describe '#process' do - let(:context) { instance_double('Temporal::Activity::Context', async?: false) } + let(:context) { instance_double('Temporal::Activity::Context', async?: false, log_tag: '') } before do allow(Temporal::Metadata) From b051b5397cec636720b06e7615612f1467b26cf4 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Tue, 13 Apr 2021 14:23:21 +0100 Subject: [PATCH 02/13] Allow configuring the drain grace period. Defaults to 25 seconds to fit inside the default grace period for kubernetes pod shutdown which is 30 seconds. --- lib/temporal/worker.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index ca2bf870..53608058 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -7,9 +7,11 @@ module Temporal class Worker + DEFAULT_SHUTDOWN_GRACE_PERIOD = 25 # seconds + # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. - def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size]) + def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], shutdown_grace_period: DEFAULT_SHUTDOWN_GRACE_PERIOD) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @@ -19,6 +21,7 @@ def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OP @activity_poller_options = { thread_pool_size: activity_thread_pool_size, } + @shutdown_grace_period = shutdown_grace_period end def register_workflow(workflow_class, options = {}) @@ -67,7 +70,7 @@ def stop pollers.each(&:stop_polling) # allow workers to drain in-transit tasks. # https://github.com/temporalio/temporal/issues/1058 - sleep 1 + sleep @shutdown_grace_period pollers.each(&:cancel_pending_requests) pollers.each(&:wait) end.join From 85feb0a44a821be19bcbbbbbfdb54cdc19a3206f Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Tue, 4 May 2021 13:41:50 +0100 Subject: [PATCH 03/13] Revert "Allow configuring the drain grace period." This reverts commit b051b5397cec636720b06e7615612f1467b26cf4. --- lib/temporal/worker.rb | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index 53608058..ca2bf870 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -7,11 +7,9 @@ module Temporal class Worker - DEFAULT_SHUTDOWN_GRACE_PERIOD = 25 # seconds - # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. - def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], shutdown_grace_period: DEFAULT_SHUTDOWN_GRACE_PERIOD) + def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size]) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @@ -21,7 +19,6 @@ def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OP @activity_poller_options = { thread_pool_size: activity_thread_pool_size, } - @shutdown_grace_period = shutdown_grace_period end def register_workflow(workflow_class, options = {}) @@ -70,7 +67,7 @@ def stop pollers.each(&:stop_polling) # allow workers to drain in-transit tasks. # https://github.com/temporalio/temporal/issues/1058 - sleep @shutdown_grace_period + sleep 1 pollers.each(&:cancel_pending_requests) pollers.each(&:wait) end.join From 6ae98e60ba6b9cbed22e034276f02b306fe4e719 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 5 May 2021 13:41:42 +0100 Subject: [PATCH 04/13] Log structured data. --- examples/activities/process_file_activity.rb | 2 +- .../activities/trip/book_flight_activity.rb | 2 +- .../activities/trip/book_hotel_activity.rb | 2 +- .../activities/trip/cancel_car_activity.rb | 2 +- .../activities/trip/cancel_flight_activity.rb | 2 +- .../activities/trip/cancel_hotel_activity.rb | 2 +- .../activities/trip/make_payment_activity.rb | 4 +-- examples/activities/trip/rent_car_activity.rb | 2 +- examples/bin/reset | 2 +- examples/bin/trigger | 2 +- examples/middleware/logging_middleware.rb | 6 ++--- examples/workflows/check_workflow.rb | 6 ++--- .../workflows/failing_activities_workflow.rb | 2 +- examples/workflows/long_workflow.rb | 2 +- lib/temporal/activity/context.rb | 6 +---- lib/temporal/activity/poller.rb | 6 ++--- lib/temporal/activity/task_processor.rb | 14 +++++------ lib/temporal/configuration.rb | 4 +-- lib/temporal/error_handler.rb | 2 +- lib/temporal/logger.rb | 25 +++++++++++++++++++ lib/temporal/saga/concern.rb | 2 +- lib/temporal/workflow.rb | 2 +- .../unit/lib/temporal/activity/poller_spec.rb | 2 +- .../temporal/activity/task_processor_spec.rb | 2 +- spec/unit/lib/temporal/saga/concern_spec.rb | 4 +-- 25 files changed, 63 insertions(+), 44 deletions(-) create mode 100644 lib/temporal/logger.rb diff --git a/examples/activities/process_file_activity.rb b/examples/activities/process_file_activity.rb index 3a81a477..4bda9c77 100644 --- a/examples/activities/process_file_activity.rb +++ b/examples/activities/process_file_activity.rb @@ -10,7 +10,7 @@ class ProcessFileActivity < Temporal::Activity def execute(input) file_contents = File.read(input) - logger.info("Processing file: #{input}") + logger.info("Processing file", { input: input }) logger.info("File contents: #{file_contents}") raise 'unknown file' diff --git a/examples/activities/trip/book_flight_activity.rb b/examples/activities/trip/book_flight_activity.rb index 1755dbbd..c6e68d65 100644 --- a/examples/activities/trip/book_flight_activity.rb +++ b/examples/activities/trip/book_flight_activity.rb @@ -1,7 +1,7 @@ module Trip class BookFlightActivity < Temporal::Activity def execute(trip_id) - logger.info "Booking flight for trip #{trip_id}" + logger.info "Booking flight", { trip_id: trip_id } return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 } end diff --git a/examples/activities/trip/book_hotel_activity.rb b/examples/activities/trip/book_hotel_activity.rb index 9ea68e99..348aa3cf 100644 --- a/examples/activities/trip/book_hotel_activity.rb +++ b/examples/activities/trip/book_hotel_activity.rb @@ -1,7 +1,7 @@ module Trip class BookHotelActivity < Temporal::Activity def execute(trip_id) - logger.info "Booking hotel room for trip #{trip_id}" + logger.info "Booking hotel room", { trip_id: trip_id } return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 } end diff --git a/examples/activities/trip/cancel_car_activity.rb b/examples/activities/trip/cancel_car_activity.rb index 669342e9..47761c6b 100644 --- a/examples/activities/trip/cancel_car_activity.rb +++ b/examples/activities/trip/cancel_car_activity.rb @@ -1,7 +1,7 @@ module Trip class CancelCarActivity < Temporal::Activity def execute(reservation_id) - logger.info "Cancelling car reservation: #{reservation_id}" + logger.info "Cancelling car reservation" { reservation_id: reservation_id } return end diff --git a/examples/activities/trip/cancel_flight_activity.rb b/examples/activities/trip/cancel_flight_activity.rb index 8d9ac01b..3f03a3e5 100644 --- a/examples/activities/trip/cancel_flight_activity.rb +++ b/examples/activities/trip/cancel_flight_activity.rb @@ -1,7 +1,7 @@ module Trip class CancelFlightActivity < Temporal::Activity def execute(reservation_id) - logger.info "Cancelling flight reservation: #{reservation_id}" + logger.info "Cancelling flight reservation", { reservation_id: reservation_id } return end diff --git a/examples/activities/trip/cancel_hotel_activity.rb b/examples/activities/trip/cancel_hotel_activity.rb index 802f6616..be3473d0 100644 --- a/examples/activities/trip/cancel_hotel_activity.rb +++ b/examples/activities/trip/cancel_hotel_activity.rb @@ -1,7 +1,7 @@ module Trip class CancelHotelActivity < Temporal::Activity def execute(reservation_id) - logger.info "Cancelling hotel reservation: #{reservation_id}" + logger.info "Cancelling hotel reservation" { reservation_id: reservation_id } return end diff --git a/examples/activities/trip/make_payment_activity.rb b/examples/activities/trip/make_payment_activity.rb index 0e7bae03..d2fd7089 100644 --- a/examples/activities/trip/make_payment_activity.rb +++ b/examples/activities/trip/make_payment_activity.rb @@ -10,9 +10,9 @@ class InsufficientFunds < Temporal::ActivityException; end ) def execute(trip_id, total) - logger.info "Processing payment for #{total} (trip_id #{trip_id})" + logger.info "Processing payment", { amount: total, trip_id: trip_id }) - raise InsufficientFunds, "Unable to charge #{total}" + raise InsufficientFunds, "Unable to charge", { amount: total } end end end diff --git a/examples/activities/trip/rent_car_activity.rb b/examples/activities/trip/rent_car_activity.rb index 2263ecc0..a4702c50 100644 --- a/examples/activities/trip/rent_car_activity.rb +++ b/examples/activities/trip/rent_car_activity.rb @@ -1,7 +1,7 @@ module Trip class RentCarActivity < Temporal::Activity def execute(trip_id) - logger.info "Renting a car for trip #{trip_id}" + logger.info "Renting a car", { trip_id: trip_id } return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 } end diff --git a/examples/bin/reset b/examples/bin/reset index e2c80325..1ce5319c 100755 --- a/examples/bin/reset +++ b/examples/bin/reset @@ -4,4 +4,4 @@ require_relative '../init' namespace, workflow_id, run_id = ARGV run_id = Temporal.reset_workflow(namespace, workflow_id, run_id) -Temporal.logger.info "Reset workflow: #{run_id}" +Temporal.logger.info "Reset workflow", { workflow_id: workflow_id, run_id: run_id } diff --git a/examples/bin/trigger b/examples/bin/trigger index 2fd45c31..725bd35e 100755 --- a/examples/bin/trigger +++ b/examples/bin/trigger @@ -11,4 +11,4 @@ workflow_id = SecureRandom.uuid input = args.map { |arg| Integer(arg) rescue arg } run_id = Temporal.start_workflow(workflow_class, *input, options: { workflow_id: workflow_id }) -Temporal.logger.info "Started workflow: #{workflow_id} / #{run_id}" +Temporal.logger.info "Started workflow", { workflow_id: workflow_id, run_id: run_id } diff --git a/examples/middleware/logging_middleware.rb b/examples/middleware/logging_middleware.rb index 5500a083..05c2ae24 100644 --- a/examples/middleware/logging_middleware.rb +++ b/examples/middleware/logging_middleware.rb @@ -6,13 +6,13 @@ def initialize(app_name) def call(metadata) entity_name = name_from(metadata) entity_type = type_from(metadata) - Temporal.logger.info("[#{app_name}]: Started #{entity_name} #{entity_type}") + Temporal.logger.info("[#{app_name}]: Started", metadata: metadata.to_h) yield - Temporal.logger.info("[#{app_name}]: Finished #{entity_name} #{entity_type}") + Temporal.logger.info("[#{app_name}]: Finished", metadata: metadata.to_h) rescue StandardError - Temporal.logger.error("[#{app_name}]: Error #{entity_name}") + Temporal.logger.error("[#{app_name}]: Error", metadata: metadata.to_h) raise end diff --git a/examples/workflows/check_workflow.rb b/examples/workflows/check_workflow.rb index 3471292b..eef82f0c 100644 --- a/examples/workflows/check_workflow.rb +++ b/examples/workflows/check_workflow.rb @@ -9,17 +9,17 @@ def execute logger.info(' => Future 1 is ready!') if future_1.ready? future_1.done do |result| - logger.info("X: future_1 completed at #{workflow.now.strftime('%H:%M:%S.%L')}") + logger.info("X: future_1 completed", { time: workflow.now.strftime('%H:%M:%S.%L') }) end result = future_2.get - logger.info("X: future_2 completed at #{workflow.now.strftime('%H:%M:%S.%L')}") + logger.info("X: future_2 completed", { time: workflow.now.strftime('%H:%M:%S.%L') }) logger.info(' => Future 3 is ready!') if future_3.ready? logger.info(' => Future 2 is ready!') if future_2.ready? future_3.done do |result| - logger.info("X: future_3 completed at #{workflow.now.strftime('%H:%M:%S.%L')}") + logger.info("X: future_3 completed", { time: workflow.now.strftime('%H:%M:%S.%L') }) end workflow.wait_for_all(future_1, future_2, future_3) diff --git a/examples/workflows/failing_activities_workflow.rb b/examples/workflows/failing_activities_workflow.rb index bfe37f2f..24684f44 100644 --- a/examples/workflows/failing_activities_workflow.rb +++ b/examples/workflows/failing_activities_workflow.rb @@ -19,7 +19,7 @@ def execute(count) end workflow.wait_for_all(*futures) - logger.info("#{futures.count(&:failed?)} activites of #{count} failed") + logger.info("Activities failed", { total: count, failed: futures.count(&:failed?) }) { finished: futures.count(&:finished?), diff --git a/examples/workflows/long_workflow.rb b/examples/workflows/long_workflow.rb index 4806e4f1..409ad49e 100644 --- a/examples/workflows/long_workflow.rb +++ b/examples/workflows/long_workflow.rb @@ -5,7 +5,7 @@ def execute(cycles = 10, interval = 1) future = LongRunningActivity.execute(cycles, interval) workflow.on_signal do |signal, input| - logger.warn "Signal received: #{signal}" + logger.warn "Signal received", { signal: signal } future.cancel end diff --git a/lib/temporal/activity/context.rb b/lib/temporal/activity/context.rb index cecf1ed5..ded09127 100644 --- a/lib/temporal/activity/context.rb +++ b/lib/temporal/activity/context.rb @@ -31,7 +31,7 @@ def async_token end def heartbeat(details = nil) - logger.debug("Activity heartbeat #{log_tag}") + logger.debug("Activity heartbeat", metadata.to_h) client.record_activity_task_heartbeat(task_token: task_token, details: details) end @@ -56,10 +56,6 @@ def headers metadata.headers end - def log_tag - "#{metadata.namespace} / #{metadata.workflow_name}[#{metadata.workflow_id}:#{metadata.workflow_run_id}] / #{metadata.name}[#{metadata.id}]##{metadata.attempt}" - end - private attr_reader :client, :metadata diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index d9ae5a5a..5fa48bc1 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -27,7 +27,7 @@ def start def stop_polling @shutting_down = true - Temporal.logger.info('Shutting down activity poller') + Temporal.logger.info('Shutting down activity poller', { namespace: namespace, task_queue: task_queue }) end def cancel_pending_requests @@ -57,7 +57,7 @@ def poll_loop return if shutting_down? - Temporal.logger.debug("Polling activity task queue (#{namespace} / #{task_queue})") + Temporal.logger.debug("Polling activity task queue", { namespace: namespace, task_queue: task_queue }) task = poll_for_task next unless task&.activity_type @@ -69,7 +69,7 @@ def poll_loop def poll_for_task client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error - Temporal.logger.error("Unable to poll activity task queue: #{error.inspect}") + Temporal.logger.error("Unable to poll activity task queue", { error: error.inspect }) Temporal::ErrorHandler.handle(error) diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 3e36107b..6c547648 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -30,8 +30,6 @@ def process raise ActivityNotRegistered, 'Activity is not registered with this worker' end - context = Activity::Context.new(client, metadata) - result = middleware_chain.invoke(metadata) do activity_class.execute_in_context(context, parse_payload(task.input)) end @@ -41,11 +39,11 @@ def process rescue StandardError, ScriptError => error Temporal::ErrorHandler.handle(error, metadata: metadata) - respond_failed(error) + respond_failed(error, context) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name) - Temporal.logger.debug("Activity task processed in #{time_diff_ms}ms (#{context.log_tag})") + Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms)) end private @@ -59,19 +57,19 @@ def queue_time_ms end def respond_completed(result, context) - Temporal.logger.info("Activity task completed (#{context.log_tag})") + Temporal.logger.info("Activity task completed", metadata.to_h) client.respond_activity_task_completed(task_token: task_token, result: result) rescue StandardError => error - Temporal.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}") + Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect)) Temporal::ErrorHandler.handle(error, metadata: metadata) end def respond_failed(error, context) - Temporal.logger.error("Activity task failed: #{error.inspect} (#{context.log_tag})") + Temporal.logger.error("Activity task failed", metadata.to_h.merge(error: error.inspect)) client.respond_activity_task_failed(task_token: task_token, exception: error) rescue StandardError => error - Temporal.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}") + Temporal.logger.error("Unable to fail Activity", metadata.to_h.merge(error: error.inspect)) Temporal::ErrorHandler.handle(error, metadata: metadata) end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 5553516f..f6e59fa2 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -1,4 +1,4 @@ -require 'logger' +require 'temporal/logger' require 'temporal/metrics_adapters/null' module Temporal @@ -26,7 +26,7 @@ class Configuration def initialize @client_type = :grpc - @logger = Logger.new(STDOUT, progname: 'temporal_client') + @logger = Temporal::Logger.new(STDOUT, progname: 'temporal_client') @metrics_adapter = MetricsAdapters::Null.new @timeouts = DEFAULT_TIMEOUTS @namespace = DEFAULT_NAMESPACE diff --git a/lib/temporal/error_handler.rb b/lib/temporal/error_handler.rb index 77bfbaad..9702edcb 100644 --- a/lib/temporal/error_handler.rb +++ b/lib/temporal/error_handler.rb @@ -4,7 +4,7 @@ def self.handle(error, metadata: nil) Temporal.configuration.error_handlers.each do |handler| handler.call(error, metadata: metadata) rescue StandardError => e - Temporal.logger.error("Error handler failed: #{e.inspect}") + Temporal.logger.error("Error handler failed", { error: e.inspect }) end end end diff --git a/lib/temporal/logger.rb b/lib/temporal/logger.rb new file mode 100644 index 00000000..cc3ff96d --- /dev/null +++ b/lib/temporal/logger.rb @@ -0,0 +1,25 @@ +require 'logger' + +module Temporal + class Logger < ::Logger + def debug(message, data = {}) + super(message.to_s + ' ' + JSON.serialize(data)) + end + + def info(message, data = {}) + super(message.to_s + ' ' + JSON.serialize(data)) + end + + def warn(message, data = {}) + super(message.to_s + ' ' + JSON.serialize(data)) + end + + def error(message, data = {}) + super(message.to_s + ' ' + JSON.serialize(data)) + end + + def fatal(message, data = {}) + super(message.to_s + ' ' + JSON.serialize(data)) + end + end +end diff --git a/lib/temporal/saga/concern.rb b/lib/temporal/saga/concern.rb index 4302cd3c..5cc36f7b 100644 --- a/lib/temporal/saga/concern.rb +++ b/lib/temporal/saga/concern.rb @@ -11,7 +11,7 @@ def run_saga(&block) Result.new(true) rescue StandardError => error # TODO: is there a need for a specialized error here? - logger.error("Saga execution aborted: #{error.inspect}") + logger.error("Saga execution aborted", { error: error.inspect }) logger.debug(error.backtrace.join("\n")) saga.compensate diff --git a/lib/temporal/workflow.rb b/lib/temporal/workflow.rb index d01b339c..725d321d 100644 --- a/lib/temporal/workflow.rb +++ b/lib/temporal/workflow.rb @@ -17,7 +17,7 @@ def self.execute_in_context(context, input) context.complete(result) rescue StandardError, ScriptError => error - Temporal.logger.error("Workflow execution failed with: #{error.inspect}") + Temporal.logger.error("Workflow execution failed", context.metadata.to_h.merge(error: error.inspect)) Temporal.logger.debug(error.backtrace.join("\n")) Temporal::ErrorHandler.handle(error, metadata: context.metadata) diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 06ebc90c..b1327869 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -108,7 +108,7 @@ def call(_); end expect(Temporal.logger) .to have_received(:error) - .with('Unable to poll activity task queue: #') + .with('Unable to poll activity task queue', { error: '#'}) end end end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 675d4a1a..3f8bed78 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -20,7 +20,7 @@ let(:input) { ['arg1', 'arg2'] } describe '#process' do - let(:context) { instance_double('Temporal::Activity::Context', async?: false, log_tag: '') } + let(:context) { instance_double('Temporal::Activity::Context', async?: false) } before do allow(Temporal::Metadata) diff --git a/spec/unit/lib/temporal/saga/concern_spec.rb b/spec/unit/lib/temporal/saga/concern_spec.rb index d3ab3946..f96dfe9f 100644 --- a/spec/unit/lib/temporal/saga/concern_spec.rb +++ b/spec/unit/lib/temporal/saga/concern_spec.rb @@ -46,7 +46,7 @@ def execute end context 'when execution compensates' do - let(:logger) { instance_double('Logger') } + let(:logger) { instance_double('Temporal::Logger') } let(:error) { TestSagaConcernError.new('execution failed') } class TestSagaConcernError < StandardError @@ -85,7 +85,7 @@ def backtrace expect(logger) .to have_received(:error) - .with('Saga execution aborted: #') + .with('Saga execution aborted', { error: '#' }) expect(logger).to have_received(:debug).with("line 1\nline 2") end end From 131860c694b303c1d2aa4ddf4694ad7867675ffb Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 5 May 2021 13:42:44 +0100 Subject: [PATCH 05/13] This isn't a log line :) --- examples/activities/trip/make_payment_activity.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/activities/trip/make_payment_activity.rb b/examples/activities/trip/make_payment_activity.rb index d2fd7089..7e0b14a4 100644 --- a/examples/activities/trip/make_payment_activity.rb +++ b/examples/activities/trip/make_payment_activity.rb @@ -12,7 +12,7 @@ class InsufficientFunds < Temporal::ActivityException; end def execute(trip_id, total) logger.info "Processing payment", { amount: total, trip_id: trip_id }) - raise InsufficientFunds, "Unable to charge", { amount: total } + raise InsufficientFunds, "Unable to charge: #{total}" end end end From 922bebbe3a3acc759425b756d854fd858a84982b Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 6 May 2021 14:10:18 +0100 Subject: [PATCH 06/13] Fix double-logging on workflow task failure. Convert some missed logging calls to metadata versions. --- lib/temporal/activity/task_processor.rb | 3 ++- lib/temporal/workflow/task_processor.rb | 17 +++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 6c547648..12b4b07d 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -21,6 +21,7 @@ def initialize(task, namespace, activity_lookup, client, middleware_chain) def process start_time = Time.now + Temporal.logger.debug("Processing Activity task", metadata.to_h) Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name) metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) @@ -69,7 +70,7 @@ def respond_failed(error, context) Temporal.logger.error("Activity task failed", metadata.to_h.merge(error: error.inspect)) client.respond_activity_task_failed(task_token: task_token, exception: error) rescue StandardError => error - Temporal.logger.error("Unable to fail Activity", metadata.to_h.merge(error: error.inspect)) + Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect)) Temporal::ErrorHandler.handle(error, metadata: metadata) end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index 7d6007ad..39f475a0 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -23,7 +23,7 @@ def initialize(task, namespace, workflow_lookup, client, middleware_chain) def process start_time = Time.now - Temporal.logger.info("Processing a workflow task for #{workflow_name}") + Temporal.logger.debug("Processing Workflow task", metadata.to_h) Temporal.metrics.timing('workflow_task.queue_time', queue_time_ms, workflow: workflow_name) if !workflow_class @@ -40,16 +40,13 @@ def process complete_task(commands) rescue StandardError => error - fail_task(error) - - Temporal.logger.error("Workflow task for #{workflow_name} failed with: #{error.inspect}") - Temporal.logger.debug(error.backtrace.join("\n")) - Temporal::ErrorHandler.handle(error, metadata: metadata) + + fail_task(error) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name) - Temporal.logger.debug("Workflow task processed in #{time_diff_ms}ms") + Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms)) end private @@ -82,13 +79,13 @@ def fetch_full_history end def complete_task(commands) - Temporal.logger.info("Workflow task for #{workflow_name} completed") + Temporal.logger.info("Workflow task completed", metadata.to_h) client.respond_workflow_task_completed(task_token: task_token, commands: commands) end def fail_task(error) - Temporal.logger.error("Workflow task for #{workflow_name} failed with: #{error.inspect}") + Temporal.logger.error("Workflow task failed", metadata.to_h.merge(error: error.inspect)) Temporal.logger.debug(error.backtrace.join("\n")) # Stop from getting into infinite loop if the error persists @@ -100,7 +97,7 @@ def fail_task(error) exception: error ) rescue StandardError => error - Temporal.logger.error("Unable to fail Workflow task #{workflow_name}: #{error.inspect}") + Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect)) Temporal::ErrorHandler.handle(error, metadata: metadata) end From 5ce269a346ea1d52f6be42bd03e40d3d35525fb1 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 6 May 2021 14:14:45 +0100 Subject: [PATCH 07/13] Fix up a few more logging callsites. --- lib/temporal/activity/poller.rb | 2 +- lib/temporal/workflow/poller.rb | 4 ++-- spec/unit/lib/temporal/activity/poller_spec.rb | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index 5fa48bc1..39e7bb9a 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -69,7 +69,7 @@ def poll_loop def poll_for_task client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error - Temporal.logger.error("Unable to poll activity task queue", { error: error.inspect }) + Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) Temporal::ErrorHandler.handle(error) diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index 607d746e..b33223ee 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -50,7 +50,7 @@ def shutting_down? def poll_loop while !shutting_down? do - Temporal.logger.debug("Polling worklow task queue (#{namespace} / #{task_queue})") + Temporal.logger.debug("Polling Worklow task queue", { namespace: namespace, task_queue: task_queue }) task = poll_for_task process(task) if task&.workflow_type @@ -60,7 +60,7 @@ def poll_loop def poll_for_task client.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error - Temporal.logger.error("Unable to poll workflow task queue: #{error.inspect}") + Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) Temporal::ErrorHandler.handle(error) diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index b1327869..38e95181 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -108,7 +108,7 @@ def call(_); end expect(Temporal.logger) .to have_received(:error) - .with('Unable to poll activity task queue', { error: '#'}) + .with('Unable to poll activity task queue', { namespace: 'test-namespace', task_queue: 'test-task-queue', error: '#'}) end end end From 8e8e1b4b99a3e164293b85c70ff2693661c3dd61 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 7 May 2021 11:25:44 +0100 Subject: [PATCH 08/13] PR feedback. --- lib/temporal/activity/task_processor.rb | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 12b4b07d..d2cd41d2 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -24,7 +24,6 @@ def process Temporal.logger.debug("Processing Activity task", metadata.to_h) Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name) - metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) context = Activity::Context.new(client, metadata) if !activity_class @@ -36,11 +35,11 @@ def process end # Do not complete asynchronous activities, these should be completed manually - respond_completed(result, context) unless context.async? + respond_completed(result) unless context.async? rescue StandardError, ScriptError => error Temporal::ErrorHandler.handle(error, metadata: metadata) - respond_failed(error, context) + respond_failed(error) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name) @@ -57,7 +56,7 @@ def queue_time_ms ((started - scheduled) * 1_000).round end - def respond_completed(result, context) + def respond_completed(result) Temporal.logger.info("Activity task completed", metadata.to_h) client.respond_activity_task_completed(task_token: task_token, result: result) rescue StandardError => error @@ -66,7 +65,7 @@ def respond_completed(result, context) Temporal::ErrorHandler.handle(error, metadata: metadata) end - def respond_failed(error, context) + def respond_failed(error) Temporal.logger.error("Activity task failed", metadata.to_h.merge(error: error.inspect)) client.respond_activity_task_failed(task_token: task_token, exception: error) rescue StandardError => error From 28231d39a7f843989d1bbaede99b2d6b0e158854 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 7 May 2021 14:34:25 +0100 Subject: [PATCH 09/13] Remove hearbeat details from logging. The user can log from their own code if needed for debugging. --- lib/temporal/metadata/activity.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/temporal/metadata/activity.rb b/lib/temporal/metadata/activity.rb index 405e9086..c7e7d814 100644 --- a/lib/temporal/metadata/activity.rb +++ b/lib/temporal/metadata/activity.rb @@ -32,8 +32,7 @@ def to_h 'workflow_run_id' => workflow_run_id, 'activity_id' => id, 'activity_name' => name, - 'attempt' => attempt, - 'heartbeat_details' => heartbeat_details + 'attempt' => attempt } end end From 914c00151e02819b50a8b8bba1ad5eb71cebecd8 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 7 May 2021 14:34:48 +0100 Subject: [PATCH 10/13] Add log spec. Ensure JSON is in strict mode. --- lib/temporal/logger.rb | 22 +++------ lib/temporal/workflow/replay_aware_logger.rb | 8 ++-- spec/unit/lib/temporal/logger_spec.rb | 47 +++++++++++++++++++ .../lib/temporal/metadata/activity_spec.rb | 1 - 4 files changed, 58 insertions(+), 20 deletions(-) create mode 100644 spec/unit/lib/temporal/logger_spec.rb diff --git a/lib/temporal/logger.rb b/lib/temporal/logger.rb index cc3ff96d..f5215288 100644 --- a/lib/temporal/logger.rb +++ b/lib/temporal/logger.rb @@ -2,24 +2,16 @@ module Temporal class Logger < ::Logger - def debug(message, data = {}) - super(message.to_s + ' ' + JSON.serialize(data)) - end - - def info(message, data = {}) - super(message.to_s + ' ' + JSON.serialize(data)) - end - - def warn(message, data = {}) - super(message.to_s + ' ' + JSON.serialize(data)) - end + SEVERITIES = %i[debug info warn error fatal unknown].freeze - def error(message, data = {}) - super(message.to_s + ' ' + JSON.serialize(data)) + SEVERITIES.each do |severity| + define_method severity do |message, data = {}| + super(message.to_s + ' ' + Oj.dump(data, mode: :strict)) + end end - def fatal(message, data = {}) - super(message.to_s + ' ' + JSON.serialize(data)) + def log(severity, message, data = {}) + super(severity, message.to_s + ' ' + Oj.dump(data, mode: strict)) end end end diff --git a/lib/temporal/workflow/replay_aware_logger.rb b/lib/temporal/workflow/replay_aware_logger.rb index 099108d7..ebb4c7a3 100644 --- a/lib/temporal/workflow/replay_aware_logger.rb +++ b/lib/temporal/workflow/replay_aware_logger.rb @@ -11,17 +11,17 @@ def initialize(main_logger, replay = true) end SEVERITIES.each do |severity| - define_method severity do |message| + define_method severity do |*args| return if replay? - main_logger.public_send(severity, message) + main_logger.public_send(severity, *args) end end - def log(severity, message) + def log(severity, *args) return if replay? - main_logger.log(severity, message) + main_logger.log(severity, *args) end private diff --git a/spec/unit/lib/temporal/logger_spec.rb b/spec/unit/lib/temporal/logger_spec.rb new file mode 100644 index 00000000..75f1112d --- /dev/null +++ b/spec/unit/lib/temporal/logger_spec.rb @@ -0,0 +1,47 @@ +require 'temporal/activity' + +describe Temporal::Logger do + subject { described_class.new(STDERR) } + + describe '.execute_in_context' do + before do + allow(subject).to receive(:add) + end + + it 'accepts data argument to debug method' do + subject.debug('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::DEBUG, nil, 'test {"a":1}') + end + + it 'accepts data argument to info method' do + subject.info('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::INFO, nil, 'test {"a":1}') + end + + it 'accepts data argument to warn method' do + subject.warn('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::WARN, nil, 'test {"a":1}') + end + + it 'accepts data argument to error method' do + subject.error('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::ERROR, nil, 'test {"a":1}') + end + + it 'accepts data argument to fatal method' do + subject.fatal('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::FATAL, nil, 'test {"a":1}') + end + + it 'accepts data argument to unknown method' do + subject.unknown('test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::UNKNOWN, nil, 'test {"a":1}') + end + end +end diff --git a/spec/unit/lib/temporal/metadata/activity_spec.rb b/spec/unit/lib/temporal/metadata/activity_spec.rb index c5cb7357..4df742be 100644 --- a/spec/unit/lib/temporal/metadata/activity_spec.rb +++ b/spec/unit/lib/temporal/metadata/activity_spec.rb @@ -31,7 +31,6 @@ it 'returns a hash' do expect(subject.to_h).to eq({ 'attempt' => subject.attempt, - 'heartbeat_details' => subject.heartbeat_details, 'activity_id' => subject.id, 'activity_name' => subject.name, 'namespace' => subject.namespace, From f70b39cbfc468599d6349ad8673947f2eb2ac007 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 7 May 2021 15:02:20 +0100 Subject: [PATCH 11/13] Typo. --- lib/temporal/logger.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/logger.rb b/lib/temporal/logger.rb index f5215288..281af69a 100644 --- a/lib/temporal/logger.rb +++ b/lib/temporal/logger.rb @@ -11,7 +11,7 @@ class Logger < ::Logger end def log(severity, message, data = {}) - super(severity, message.to_s + ' ' + Oj.dump(data, mode: strict)) + super(severity, message.to_s + ' ' + Oj.dump(data, mode: :strict)) end end end From 0a3003512f5fdd09284ac2a0aac22e48268e092a Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 7 May 2021 15:20:22 +0100 Subject: [PATCH 12/13] Add log method spec. --- lib/temporal/logger.rb | 2 +- spec/unit/lib/temporal/logger_spec.rb | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/temporal/logger.rb b/lib/temporal/logger.rb index 281af69a..84aa5706 100644 --- a/lib/temporal/logger.rb +++ b/lib/temporal/logger.rb @@ -11,7 +11,7 @@ class Logger < ::Logger end def log(severity, message, data = {}) - super(severity, message.to_s + ' ' + Oj.dump(data, mode: :strict)) + add(severity, message.to_s + ' ' + Oj.dump(data, mode: :strict)) end end end diff --git a/spec/unit/lib/temporal/logger_spec.rb b/spec/unit/lib/temporal/logger_spec.rb index 75f1112d..3772e2e2 100644 --- a/spec/unit/lib/temporal/logger_spec.rb +++ b/spec/unit/lib/temporal/logger_spec.rb @@ -8,6 +8,12 @@ allow(subject).to receive(:add) end + it 'accepts data argument to log method' do + subject.log(Logger::DEBUG, 'test', { a: 1 }) + + expect(subject).to have_received(:add).with(Logger::DEBUG, 'test {"a":1}') + end + it 'accepts data argument to debug method' do subject.debug('test', { a: 1 }) From b301583892475009bf145baddfc4409f5642fdef Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Mon, 10 May 2021 12:41:55 +0100 Subject: [PATCH 13/13] Match Logger interface. --- lib/temporal/workflow/replay_aware_logger.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/temporal/workflow/replay_aware_logger.rb b/lib/temporal/workflow/replay_aware_logger.rb index ebb4c7a3..a56494b4 100644 --- a/lib/temporal/workflow/replay_aware_logger.rb +++ b/lib/temporal/workflow/replay_aware_logger.rb @@ -11,17 +11,17 @@ def initialize(main_logger, replay = true) end SEVERITIES.each do |severity| - define_method severity do |*args| + define_method severity do |message, data = {}| return if replay? - main_logger.public_send(severity, *args) + main_logger.public_send(severity, message, data) end end - def log(severity, *args) + def log(severity, message, data = {}) return if replay? - main_logger.log(severity, *args) + main_logger.log(severity, message, data) end private