Skip to content
2 changes: 1 addition & 1 deletion examples/activities/process_file_activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ProcessFileActivity < Temporal::Activity
def execute(input)
file_contents = File.read(input)

logger.info("Processing file: #{input}")
logger.info("Processing file", { input: input })
logger.info("File contents: #{file_contents}")

raise 'unknown file'
Expand Down
2 changes: 1 addition & 1 deletion examples/activities/trip/book_flight_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class BookFlightActivity < Temporal::Activity
def execute(trip_id)
logger.info "Booking flight for trip #{trip_id}"
logger.info "Booking flight", { trip_id: trip_id }

return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 }
end
Expand Down
2 changes: 1 addition & 1 deletion examples/activities/trip/book_hotel_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class BookHotelActivity < Temporal::Activity
def execute(trip_id)
logger.info "Booking hotel room for trip #{trip_id}"
logger.info "Booking hotel room", { trip_id: trip_id }

return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 }
end
Expand Down
2 changes: 1 addition & 1 deletion examples/activities/trip/cancel_car_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class CancelCarActivity < Temporal::Activity
def execute(reservation_id)
logger.info "Cancelling car reservation: #{reservation_id}"
logger.info "Cancelling car reservation" { reservation_id: reservation_id }

return
end
Expand Down
2 changes: 1 addition & 1 deletion examples/activities/trip/cancel_flight_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class CancelFlightActivity < Temporal::Activity
def execute(reservation_id)
logger.info "Cancelling flight reservation: #{reservation_id}"
logger.info "Cancelling flight reservation", { reservation_id: reservation_id }

return
end
Expand Down
2 changes: 1 addition & 1 deletion examples/activities/trip/cancel_hotel_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class CancelHotelActivity < Temporal::Activity
def execute(reservation_id)
logger.info "Cancelling hotel reservation: #{reservation_id}"
logger.info "Cancelling hotel reservation" { reservation_id: reservation_id }

return
end
Expand Down
4 changes: 2 additions & 2 deletions examples/activities/trip/make_payment_activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class InsufficientFunds < Temporal::ActivityException; end
)

def execute(trip_id, total)
logger.info "Processing payment for #{total} (trip_id #{trip_id})"
logger.info "Processing payment", { amount: total, trip_id: trip_id })

raise InsufficientFunds, "Unable to charge #{total}"
raise InsufficientFunds, "Unable to charge: #{total}"
end
end
end
2 changes: 1 addition & 1 deletion examples/activities/trip/rent_car_activity.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Trip
class RentCarActivity < Temporal::Activity
def execute(trip_id)
logger.info "Renting a car for trip #{trip_id}"
logger.info "Renting a car", { trip_id: trip_id }

return { reservation_id: SecureRandom.uuid, total: rand(0..1000) / 10.0 }
end
Expand Down
2 changes: 1 addition & 1 deletion examples/bin/reset
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ require_relative '../init'
namespace, workflow_id, run_id = ARGV

run_id = Temporal.reset_workflow(namespace, workflow_id, run_id)
Temporal.logger.info "Reset workflow: #{run_id}"
Temporal.logger.info "Reset workflow", { workflow_id: workflow_id, run_id: run_id }
2 changes: 1 addition & 1 deletion examples/bin/trigger
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ workflow_id = SecureRandom.uuid
input = args.map { |arg| Integer(arg) rescue arg }

run_id = Temporal.start_workflow(workflow_class, *input, options: { workflow_id: workflow_id })
Temporal.logger.info "Started workflow: #{workflow_id} / #{run_id}"
Temporal.logger.info "Started workflow", { workflow_id: workflow_id, run_id: run_id }
6 changes: 3 additions & 3 deletions examples/middleware/logging_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ def initialize(app_name)
def call(metadata)
entity_name = name_from(metadata)
entity_type = type_from(metadata)
Temporal.logger.info("[#{app_name}]: Started #{entity_name} #{entity_type}")
Temporal.logger.info("[#{app_name}]: Started", metadata: metadata.to_h)

yield

Temporal.logger.info("[#{app_name}]: Finished #{entity_name} #{entity_type}")
Temporal.logger.info("[#{app_name}]: Finished", metadata: metadata.to_h)
rescue StandardError
Temporal.logger.error("[#{app_name}]: Error #{entity_name}")
Temporal.logger.error("[#{app_name}]: Error", metadata: metadata.to_h)

raise
end
Expand Down
6 changes: 3 additions & 3 deletions examples/workflows/check_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ def execute
logger.info(' => Future 1 is ready!') if future_1.ready?

future_1.done do |result|
logger.info("X: future_1 completed at #{workflow.now.strftime('%H:%M:%S.%L')}")
logger.info("X: future_1 completed", { time: workflow.now.strftime('%H:%M:%S.%L') })
end

result = future_2.get
logger.info("X: future_2 completed at #{workflow.now.strftime('%H:%M:%S.%L')}")
logger.info("X: future_2 completed", { time: workflow.now.strftime('%H:%M:%S.%L') })

logger.info(' => Future 3 is ready!') if future_3.ready?
logger.info(' => Future 2 is ready!') if future_2.ready?

future_3.done do |result|
logger.info("X: future_3 completed at #{workflow.now.strftime('%H:%M:%S.%L')}")
logger.info("X: future_3 completed", { time: workflow.now.strftime('%H:%M:%S.%L') })
end

workflow.wait_for_all(future_1, future_2, future_3)
Expand Down
2 changes: 1 addition & 1 deletion examples/workflows/failing_activities_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def execute(count)
end
workflow.wait_for_all(*futures)

logger.info("#{futures.count(&:failed?)} activites of #{count} failed")
logger.info("Activities failed", { total: count, failed: futures.count(&:failed?) })

{
finished: futures.count(&:finished?),
Expand Down
2 changes: 1 addition & 1 deletion examples/workflows/long_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def execute(cycles = 10, interval = 1)
future = LongRunningActivity.execute(cycles, interval)

workflow.on_signal do |signal, input|
logger.warn "Signal received: #{signal}"
logger.warn "Signal received", { signal: signal }
future.cancel
end

Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/activity/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def async_token
end

def heartbeat(details = nil)
logger.debug('Activity heartbeat')
logger.debug("Activity heartbeat", metadata.to_h)
client.record_activity_task_heartbeat(task_token: task_token, details: details)
end

Expand Down
6 changes: 3 additions & 3 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def start

def stop_polling
@shutting_down = true
Temporal.logger.info('Shutting down activity poller')
Temporal.logger.info('Shutting down activity poller', { namespace: namespace, task_queue: task_queue })
end

def cancel_pending_requests
Expand Down Expand Up @@ -57,7 +57,7 @@ def poll_loop

return if shutting_down?

Temporal.logger.debug("Polling activity task queue (#{namespace} / #{task_queue})")
Temporal.logger.debug("Polling activity task queue", { namespace: namespace, task_queue: task_queue })

task = poll_for_task
next unless task&.activity_type
Expand All @@ -69,7 +69,7 @@ def poll_loop
def poll_for_task
client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
rescue StandardError => error
Temporal.logger.error("Unable to poll activity task queue: #{error.inspect}")
Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })

Temporal::ErrorHandler.handle(error)

Expand Down
16 changes: 8 additions & 8 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ def initialize(task, namespace, activity_lookup, client, middleware_chain)
def process
start_time = Time.now

Temporal.logger.info("Processing activity task for #{activity_name}")
Temporal.logger.debug("Processing Activity task", metadata.to_h)
Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name)

context = Activity::Context.new(client, metadata)

if !activity_class
raise ActivityNotRegistered, 'Activity is not registered with this worker'
end

context = Activity::Context.new(client, metadata)

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, parse_payload(task.input))
end
Expand All @@ -43,7 +43,7 @@ def process
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name)
Temporal.logger.debug("Activity task processed in #{time_diff_ms}ms")
Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

private
Expand All @@ -57,19 +57,19 @@ def queue_time_ms
end

def respond_completed(result)
Temporal.logger.info("Activity #{activity_name} completed")
Temporal.logger.info("Activity task completed", metadata.to_h)
client.respond_activity_task_completed(task_token: task_token, result: result)
rescue StandardError => error
Temporal.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}")
Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect))

Temporal::ErrorHandler.handle(error, metadata: metadata)
end

def respond_failed(error)
Temporal.logger.error("Activity #{activity_name} failed with: #{error.inspect}")
Temporal.logger.error("Activity task failed", metadata.to_h.merge(error: error.inspect))
client.respond_activity_task_failed(task_token: task_token, exception: error)
rescue StandardError => error
Temporal.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}")
Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect))

Temporal::ErrorHandler.handle(error, metadata: metadata)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'logger'
require 'temporal/logger'
require 'temporal/metrics_adapters/null'

module Temporal
Expand Down Expand Up @@ -26,7 +26,7 @@ class Configuration

def initialize
@client_type = :grpc
@logger = Logger.new(STDOUT, progname: 'temporal_client')
@logger = Temporal::Logger.new(STDOUT, progname: 'temporal_client')
@metrics_adapter = MetricsAdapters::Null.new
@timeouts = DEFAULT_TIMEOUTS
@namespace = DEFAULT_NAMESPACE
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ def self.handle(error, metadata: nil)
Temporal.configuration.error_handlers.each do |handler|
handler.call(error, metadata: metadata)
rescue StandardError => e
Temporal.logger.error("Error handler failed: #{e.inspect}")
Temporal.logger.error("Error handler failed", { error: e.inspect })
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions lib/temporal/logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require 'logger'

module Temporal
class Logger < ::Logger
SEVERITIES = %i[debug info warn error fatal unknown].freeze

SEVERITIES.each do |severity|
define_method severity do |message, data = {}|
super(message.to_s + ' ' + Oj.dump(data, mode: :strict))
end
end

def log(severity, message, data = {})
add(severity, message.to_s + ' ' + Oj.dump(data, mode: :strict))
end
end
end
3 changes: 1 addition & 2 deletions lib/temporal/metadata/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ def to_h
'workflow_run_id' => workflow_run_id,
'activity_id' => id,
'activity_name' => name,
'attempt' => attempt,
'heartbeat_details' => heartbeat_details
'attempt' => attempt
}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/saga/concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def run_saga(&block)

Result.new(true)
rescue StandardError => error # TODO: is there a need for a specialized error here?
logger.error("Saga execution aborted: #{error.inspect}")
logger.error("Saga execution aborted", { error: error.inspect })
logger.debug(error.backtrace.join("\n"))

saga.compensate
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def self.execute_in_context(context, input)

context.complete(result) unless context.completed?
rescue StandardError, ScriptError => error
Temporal.logger.error("Workflow execution failed with: #{error.inspect}")
Temporal.logger.error("Workflow execution failed", context.metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))

Temporal::ErrorHandler.handle(error, metadata: context.metadata)
Expand Down
4 changes: 2 additions & 2 deletions lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def poll_loop

return if shutting_down?

Temporal.logger.debug("Polling worklow task queue (#{namespace} / #{task_queue})")
Temporal.logger.debug("Polling Worklow task queue", { namespace: namespace, task_queue: task_queue })

task = poll_for_task
next unless task&.workflow_type
Expand All @@ -69,7 +69,7 @@ def poll_loop
def poll_for_task
client.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue)
rescue StandardError => error
Temporal.logger.error("Unable to poll workflow task queue: #{error.inspect}")
Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })

Temporal::ErrorHandler.handle(error)

Expand Down
8 changes: 4 additions & 4 deletions lib/temporal/workflow/replay_aware_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ def initialize(main_logger, replay = true)
end

SEVERITIES.each do |severity|
define_method severity do |message|
define_method severity do |message, data = {}|
return if replay?

main_logger.public_send(severity, message)
main_logger.public_send(severity, message, data)
end
end

def log(severity, message)
def log(severity, message, data = {})
return if replay?

main_logger.log(severity, message)
main_logger.log(severity, message, data)
end

private
Expand Down
17 changes: 7 additions & 10 deletions lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def initialize(task, namespace, workflow_lookup, client, middleware_chain)
def process
start_time = Time.now

Temporal.logger.info("Processing a workflow task for #{workflow_name}")
Temporal.logger.debug("Processing Workflow task", metadata.to_h)
Temporal.metrics.timing('workflow_task.queue_time', queue_time_ms, workflow: workflow_name)

if !workflow_class
Expand All @@ -40,16 +40,13 @@ def process

complete_task(commands)
rescue StandardError => error
fail_task(error)

Temporal.logger.error("Workflow task for #{workflow_name} failed with: #{error.inspect}")
Temporal.logger.debug(error.backtrace.join("\n"))

Temporal::ErrorHandler.handle(error, metadata: metadata)

fail_task(error)
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name)
Temporal.logger.debug("Workflow task processed in #{time_diff_ms}ms")
Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

private
Expand Down Expand Up @@ -82,13 +79,13 @@ def fetch_full_history
end

def complete_task(commands)
Temporal.logger.info("Workflow task for #{workflow_name} completed")
Temporal.logger.info("Workflow task completed", metadata.to_h)

client.respond_workflow_task_completed(task_token: task_token, commands: commands)
end

def fail_task(error)
Temporal.logger.error("Workflow task for #{workflow_name} failed with: #{error.inspect}")
Temporal.logger.error("Workflow task failed", metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))

# Only fail the workflow task on the first attempt. Subsequent failures of the same workflow task
Expand All @@ -102,7 +99,7 @@ def fail_task(error)
exception: error
)
rescue StandardError => error
Temporal.logger.error("Unable to fail Workflow task #{workflow_name}: #{error.inspect}")
Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect))

Temporal::ErrorHandler.handle(error, metadata: metadata)
end
Expand Down
Loading