From d146caf17c54f2d3ccce54a0ae01d6d5281fa7e8 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Fri, 18 Feb 2022 08:21:07 -0800 Subject: [PATCH 01/19] Support for invoking and processing queries, WIP --- examples/bin/query | 14 +++++ examples/bin/worker | 1 + .../spec/integration/query_workflow_spec.rb | 18 ++++++ examples/workflows/query_workflow.rb | 22 +++++++ lib/temporal.rb | 5 +- lib/temporal/client.rb | 23 ++++++- lib/temporal/concerns/payloads.rb | 8 +++ lib/temporal/connection/grpc.rb | 60 +++++++++++++++++-- lib/temporal/metadata.rb | 7 ++- lib/temporal/metadata/workflow_task.rb | 8 ++- .../testing/local_workflow_context.rb | 4 ++ lib/temporal/workflow/context.rb | 8 +++ lib/temporal/workflow/dispatcher.rb | 14 ++++- lib/temporal/workflow/executor.rb | 4 ++ lib/temporal/workflow/task_processor.rb | 32 +++++++++- .../grpc/workflow_query_fabricator.rb | 4 ++ .../grpc/workflow_task_fabricator.rb | 1 + .../workflow_task_metadata_fabricator.rb | 2 + .../temporal/metadata/workflow_task_spec.rb | 4 ++ spec/unit/lib/temporal/metadata_spec.rb | 2 + 20 files changed, 227 insertions(+), 14 deletions(-) create mode 100755 examples/bin/query create mode 100644 examples/spec/integration/query_workflow_spec.rb create mode 100644 examples/workflows/query_workflow.rb create mode 100644 spec/fabricators/grpc/workflow_query_fabricator.rb diff --git a/examples/bin/query b/examples/bin/query new file mode 100755 index 00000000..c0e7f719 --- /dev/null +++ b/examples/bin/query @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +require_relative '../init' + +Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f } + +workflow_class_name, workflow_id, run_id, query, args = ARGV +workflow_class = Object.const_get(workflow_class_name) + +if ![workflow_class, workflow_id, run_id, query].all? + fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`' +end + +result = Temporal.query_workflow(workflow_class, query, workflow_id, run_id, args) +puts result.inspect diff --git a/examples/bin/worker b/examples/bin/worker index 435e418e..051436c5 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -36,6 +36,7 @@ worker.register_workflow(MetadataWorkflow) worker.register_workflow(ParentCloseWorkflow) worker.register_workflow(ParentWorkflow) worker.register_workflow(ProcessFileWorkflow) +worker.register_workflow(QueryWorkflow) worker.register_workflow(QuickTimeoutWorkflow) worker.register_workflow(RandomlyFailingWorkflow) worker.register_workflow(ReleaseWorkflow) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb new file mode 100644 index 00000000..64bfd721 --- /dev/null +++ b/examples/spec/integration/query_workflow_spec.rb @@ -0,0 +1,18 @@ +require 'workflows/query_workflow' + +describe QueryWorkflow, :integration do + subject { described_class } + + it 'returns the correct result for the queries' do + workflow_id, run_id = run_workflow(described_class) + + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq "waiting for cancel" + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq "WAITING FOR CANCEL" + + Temporal.signal_workflow(described_class, 'cancel', workflow_id, run_id) + wait_for_workflow_completion(workflow_id, run_id) + + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq "cancelled" + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq "CANCELLED" + end +end diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb new file mode 100644 index 00000000..a63dc29d --- /dev/null +++ b/examples/workflows/query_workflow.rb @@ -0,0 +1,22 @@ +class QueryWorkflow < Temporal::Workflow + def execute + state = "waiting for cancel" + last_signal_received = nil + workflow.on_query("state") do |input| + input == "upcase" ? state.upcase : state + end + + workflow.on_signal do |signal| + last_signal_received = signal + end + + workflow.wait_for do + last_signal_received == "cancel" + end + state = "cancelled" + + { + final_state: state + } + end +end diff --git a/lib/temporal.rb b/lib/temporal.rb index 0b95a882..7f62be78 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -20,6 +20,7 @@ module Temporal :describe_namespace, :list_namespaces, :signal_workflow, + :query_workflow, :await_workflow_result, :reset_workflow, :terminate_workflow, @@ -48,11 +49,11 @@ def metrics end private - + def default_client @default_client ||= Client.new(config) end - + def config @config ||= Configuration.new end diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index c1e9a2da..807e244c 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -176,6 +176,27 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac ) end + # Issue a query against a running workflow + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param query [String] name of the query to issue + # @param workflow_id [String] + # @param run_id [String] + # @param args [String, Array, nil] optional arguments for the query + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default + def query_workflow(workflow, query, workflow_id, run_id, args = nil, namespace: nil) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) + + connection.query_workflow( + namespace: namespace || execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id, + query: query, + args: args + ) + end + # Long polls for a workflow to be completed and returns workflow's return value. # # @note This function times out after 30 seconds and throws Temporal::TimeoutError, @@ -207,7 +228,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam timeout: timeout || max_timeout, ) rescue GRPC::DeadlineExceeded => e - message = if timeout + message = if timeout "Timed out after your specified limit of timeout: #{timeout} seconds" else "Timed out after #{max_timeout} seconds, which is the maximum supported amount." diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb index 3be276d2..ad703542 100644 --- a/lib/temporal/concerns/payloads.rb +++ b/lib/temporal/concerns/payloads.rb @@ -21,6 +21,10 @@ def from_signal_payloads(payloads) from_payloads(payloads)&.first end + def from_query_payloads(payloads) + from_payloads(payloads)&.first + end + def from_payload_map(payload_map) payload_map.map { |key, value| [key, from_payload(value)] }.to_h end @@ -45,6 +49,10 @@ def to_signal_payloads(data) to_payloads([data]) end + def to_query_payloads(data) + to_payloads([data]) + end + def to_payload_map(data) data.transform_values(&method(:to_payload)) end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 9e27a10e..5c0c0aee 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -142,7 +142,7 @@ def get_workflow_execution_history( event_type: :all, timeout: nil ) - if wait_for_new_event + if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. raise "You must specify a timeout when wait_for_new_event = true." @@ -452,16 +452,66 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed - raise NotImplementedError + def respond_query_task_completed(result:, **request_params) + request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( + **request_params.merge(query_result: to_query_payloads(result)) + ) + client.respond_query_task_completed(request) end def reset_sticky_task_queue raise NotImplementedError end - def query_workflow - raise NotImplementedError + # async _queryWorkflowHandler(input) { + # let response; + # try { + # response = await this.service.queryWorkflow({ + # queryRejectCondition: input.queryRejectCondition, + # namespace: this.options.namespace, + # execution: input.workflowExecution, + # query: { + # queryType: input.queryType, + # queryArgs: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, + # }, + # }); + # } + # catch (err) { + # this.rethrowGrpcError(err, input.workflowExecution, 'Failed to query Workflow'); + # } + # if (response.queryRejected) { + # if (response.queryRejected.status === undefined || response.queryRejected.status === null) { + # throw new TypeError('Received queryRejected from server with no status'); + # } + # throw new QueryRejectedError(response.queryRejected.status); + # } + # if (!response.queryResult) { + # throw new TypeError('Invalid response from server'); + # } + # // We ignore anything but the first result + # return this.options.dataConverter.fromPayloads(0, response.queryResult?.payloads); + # } + def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil) + request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new( + # query_reject_condition: , + namespace: namespace, + execution: Temporal::Api::Common::V1::WorkflowExecution.new( + workflow_id: workflow_id, + run_id: run_id + ), + query: Temporal::Api::Query::V1::WorkflowQuery.new( + query_type: query, + query_args: to_query_payloads(args) + ) + ) + response = client.query_workflow(request) + if response.query_rejected + + elsif !response.query_result + + else + from_query_payloads(response.query_result) + end end def describe_workflow_execution(namespace:, workflow_id:, run_id:) diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index 4bcfade8..e4291757 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -27,6 +27,8 @@ def generate_activity_metadata(task, namespace) ) end + # @param task [Temporal::Api::WorkflowService::V1::PollWorkflowTaskQueueResponse] + # @param namespace [String] def generate_workflow_task_metadata(task, namespace) Metadata::WorkflowTask.new( namespace: namespace, @@ -35,12 +37,13 @@ def generate_workflow_task_metadata(task, namespace) attempt: task.attempt, workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, - workflow_name: task.workflow_type.name + workflow_name: task.workflow_type.name, + query_type: task.query&.query_type, + query_args: from_query_payloads(task.query&.query_args) ) end # @param event [Temporal::Workflow::History::Event] Workflow started history event - # @param event [WorkflowExecutionStartedEventAttributes] :attributes # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata def generate_workflow_metadata(event, task_metadata) Metadata::Workflow.new( diff --git a/lib/temporal/metadata/workflow_task.rb b/lib/temporal/metadata/workflow_task.rb index c287d2c5..080a5fff 100644 --- a/lib/temporal/metadata/workflow_task.rb +++ b/lib/temporal/metadata/workflow_task.rb @@ -3,9 +3,9 @@ module Temporal module Metadata class WorkflowTask < Base - attr_reader :namespace, :id, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name + attr_reader :namespace, :id, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :query_type, :query_args - def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:) + def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, query_type:, query_args:) @namespace = namespace @id = id @task_token = task_token @@ -13,6 +13,8 @@ def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflo @workflow_run_id = workflow_run_id @workflow_id = workflow_id @workflow_name = workflow_name + @query_type = query_type + @query_args = query_args freeze end @@ -28,6 +30,8 @@ def to_h 'workflow_name' => workflow_name, 'workflow_id' => workflow_id, 'workflow_run_id' => workflow_run_id, + 'query_type' => query_type, + 'query_args' => query_args, 'attempt' => attempt } end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index a115eb26..d73a5398 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -190,6 +190,10 @@ def on_signal(&block) raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end + def on_query(&block) + raise NotImplementedError, 'Queries are not available when Temporal::Testing.local! is on' + end + def cancel_activity(activity_id) raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on' end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index ac4454ce..5922cd6a 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -298,6 +298,14 @@ def on_signal(&block) end end + def on_query(query, &block) + target = History::EventTarget.workflow + + dispatcher.register_query_handler(target, query) do |input| + block.call(input) + end + end + def cancel_activity(activity_id) command = Command::RequestActivityCancellation.new(activity_id: activity_id) diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 2a768e54..acad0f5a 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -6,21 +6,33 @@ class Dispatcher def initialize @handlers = Hash.new { |hash, key| hash[key] = [] } + @query_handlers = Hash.new { |hash, key| hash[key] = {} } end def register_handler(target, event_name, &handler) handlers[target] << [event_name, handler] end + def register_query_handler(target, query, &handler) + query_handlers[target][query] = handler + end + def dispatch(target, event_name, args = nil) handlers_for(target, event_name).each do |handler| handler.call(*args) end end + def process_query(target, query, args) + if query == '__cadence_web_list' + return query_handlers[target].keys + %w[__stack_trace] + end + query_handlers[target][query].call(*args) + end + private - attr_reader :handlers + attr_reader :handlers, :query_handlers def handlers_for(target, event_name) handlers[target] diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 78feb61b..3044142a 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -36,6 +36,10 @@ def run return state_manager.commands end + def process_query(query, args) + dispatcher.process_query(History::EventTarget.workflow, query, args) + end + private attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index ce271e4e..33e5431c 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -38,7 +38,37 @@ def process executor.run end - complete_task(commands) + if metadata.query_type + # queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{ + # TaskToken: task.TaskToken, + # Namespace: wth.namespace, + # } + # var panicErr *PanicError + # if errors.As(workflowContext.err, &panicErr) { + # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED + # queryCompletedRequest.ErrorMessage = "Workflow panic: " + panicErr.Error() + # return queryCompletedRequest + # } + + # result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.QueryArgs, task.Query.Header) + # if err != nil { + # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED + # queryCompletedRequest.ErrorMessage = err.Error() + # } else { + # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED + # queryCompletedRequest.QueryResult = result + # } + # return queryCompletedRequest + result = executor.process_query(metadata.query_type, metadata.query_args) + connection.respond_query_task_completed( + task_token: task_token, + namespace: namespace, + completed_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, + result: result + ) + else + complete_task(commands) + end rescue StandardError => error Temporal::ErrorHandler.handle(error, config, metadata: metadata) diff --git a/spec/fabricators/grpc/workflow_query_fabricator.rb b/spec/fabricators/grpc/workflow_query_fabricator.rb new file mode 100644 index 00000000..dcabbb24 --- /dev/null +++ b/spec/fabricators/grpc/workflow_query_fabricator.rb @@ -0,0 +1,4 @@ +Fabricator(:api_workflow_query, from: Temporal::Api::Query::V1::WorkflowQuery) do + query_type { 'state' } + query_args { Temporal.configuration.converter.to_payloads(['']) } +end diff --git a/spec/fabricators/grpc/workflow_task_fabricator.rb b/spec/fabricators/grpc/workflow_task_fabricator.rb index a699428a..33eb09ef 100644 --- a/spec/fabricators/grpc/workflow_task_fabricator.rb +++ b/spec/fabricators/grpc/workflow_task_fabricator.rb @@ -10,6 +10,7 @@ scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } history { |attrs| Temporal::Api::History::V1::History.new(events: attrs[:events]) } + query { Fabricate(:api_workflow_query) } end Fabricator(:api_paginated_workflow_task, from: :api_workflow_task) do diff --git a/spec/fabricators/workflow_task_metadata_fabricator.rb b/spec/fabricators/workflow_task_metadata_fabricator.rb index d5149def..a1def3f0 100644 --- a/spec/fabricators/workflow_task_metadata_fabricator.rb +++ b/spec/fabricators/workflow_task_metadata_fabricator.rb @@ -8,4 +8,6 @@ workflow_run_id { SecureRandom.uuid } workflow_id { SecureRandom.uuid } workflow_name 'TestWorkflow' + query_type 'state' + query_args nil end diff --git a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb index 7259c1f9..2901830e 100644 --- a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb +++ b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb @@ -13,6 +13,8 @@ expect(subject.workflow_run_id).to eq(args.workflow_run_id) expect(subject.workflow_id).to eq(args.workflow_id) expect(subject.workflow_name).to eq(args.workflow_name) + expect(subject.query_type).to eq(args.query_type) + expect(subject.query_args).to eq(args.query_args) end it { is_expected.to be_frozen } @@ -33,6 +35,8 @@ 'workflow_id' => subject.workflow_id, 'workflow_name' => subject.workflow_name, 'workflow_run_id' => subject.workflow_run_id, + 'query_type' => subject.query_type, + 'query_args' => subject.query_args, }) end end diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index cd21fb76..ec2352ff 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -42,6 +42,8 @@ expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) expect(subject.workflow_name).to eq(data.workflow_type.name) + expect(subject.query_type).to eq(data.query.query_type) + expect(subject.query_args).to eq('') end end From 3594d9040c81ce32fce858e3f2eaa2fff9d818f1 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Wed, 23 Feb 2022 17:14:10 -0800 Subject: [PATCH 02/19] Catch-all query handler support, feedback changes Made a handful of changes on approach from the initial spike. This is operating under an assumption that the added EventTarget type for query is a valid approach --- .../spec/integration/query_workflow_spec.rb | 18 +- examples/workflows/query_workflow.rb | 30 +++- lib/temporal/client.rb | 2 +- lib/temporal/connection/grpc.rb | 44 +++-- lib/temporal/metadata.rb | 4 +- lib/temporal/metadata/workflow_task.rb | 8 +- lib/temporal/workflow/context.rb | 13 +- lib/temporal/workflow/dispatcher.rb | 21 +-- lib/temporal/workflow/executor.rb | 5 +- lib/temporal/workflow/history/event_target.rb | 5 + lib/temporal/workflow/task_processor.rb | 90 ++++++---- .../grpc/workflow_task_fabricator.rb | 2 +- .../workflow_task_metadata_fabricator.rb | 2 - .../{grpc_client_spec.rb => grpc_spec.rb} | 16 +- .../temporal/metadata/workflow_task_spec.rb | 6 +- spec/unit/lib/temporal/metadata_spec.rb | 2 - .../temporal/workflow/task_processor_spec.rb | 157 +++++++++++++----- 17 files changed, 285 insertions(+), 140 deletions(-) rename spec/unit/lib/temporal/{grpc_client_spec.rb => grpc_spec.rb} (98%) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index 64bfd721..f63b2540 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -6,13 +6,21 @@ it 'returns the correct result for the queries' do workflow_id, run_id = run_workflow(described_class) - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq "waiting for cancel" - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq "WAITING FOR CANCEL" + # These query calls should leverage the task.queries mechanism, as there is a locally executed sleeping activity + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq 'started' + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq 'STARTED' + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)).to eq nil + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, 'reverse', 'upcase')).to eq nil + expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)).to eq nil + + Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) - Temporal.signal_workflow(described_class, 'cancel', workflow_id, run_id) wait_for_workflow_completion(workflow_id, run_id) - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq "cancelled" - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq "CANCELLED" + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq 'finished' + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)).to eq 'finish' + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, 'reverse')).to eq 'hsinif' + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq 'FINISHED' + expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)).to eq nil end end diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb index a63dc29d..27517df2 100644 --- a/examples/workflows/query_workflow.rb +++ b/examples/workflows/query_workflow.rb @@ -1,21 +1,35 @@ class QueryWorkflow < Temporal::Workflow + timeouts task: 10 + def execute - state = "waiting for cancel" + state = "started" last_signal_received = nil + + # Demonstrating catch-all query handler. + workflow.on_query do |query, input| + case query + when "last_signal" + input == "reverse" ? last_signal_received&.reverse : last_signal_received + else + nil + # TODO appropriate error handling? + # raise StandardError, "Unrecognized query type '#{query}'" + end + end + + # Demonstrating targeted query handler. Note that this specific query handler would be invoked + # instead of the more broad catch-all query handler above. workflow.on_query("state") do |input| input == "upcase" ? state.upcase : state end - workflow.on_signal do |signal| - last_signal_received = signal - end + workflow.on_signal { |signal| last_signal_received = signal } - workflow.wait_for do - last_signal_received == "cancel" - end - state = "cancelled" + workflow.wait_for { last_signal_received == "finish" } + state = "finished" { + last_signal_received: last_signal_received, final_state: state } end diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 807e244c..a32bd4a2 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -185,7 +185,7 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac # @param args [String, Array, nil] optional arguments for the query # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the # global default - def query_workflow(workflow, query, workflow_id, run_id, args = nil, namespace: nil) + def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil) execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) connection.query_workflow( diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 5c0c0aee..a4295f27 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -183,12 +183,41 @@ def poll_workflow_task_queue(namespace:, task_queue:) poll_request.execute end - def respond_workflow_task_completed(namespace:, task_token:, commands:) + def respond_query_task_completed(namespace:, task_token:, query_result: nil, error_message: nil) + request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( + task_token: task_token, + namespace: namespace + ) + if !error_message.nil? + request.error_message = error_message + request.completed_type = Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED + else + request.query_result = to_query_payloads(query_result) + request.completed_type = Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED + end + client.respond_query_task_completed(request) + end + + def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results:) + query_results&.transform_values! do |query_result| + if query_result.is_a?(StandardError) + { + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED, + error_message: query_result.message + } + else + { + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, + answer: to_query_payloads(query_result) + } + end + end request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( namespace: namespace, identity: identity, task_token: task_token, - commands: Array(commands).map { |(_, command)| Serializer.serialize(command) } + commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, + query_results: query_results || {} ) client.respond_workflow_task_completed(request) end @@ -452,13 +481,6 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed(result:, **request_params) - request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( - **request_params.merge(query_result: to_query_payloads(result)) - ) - client.respond_query_task_completed(request) - end - def reset_sticky_task_queue raise NotImplementedError end @@ -488,7 +510,7 @@ def reset_sticky_task_queue # if (!response.queryResult) { # throw new TypeError('Invalid response from server'); # } - # // We ignore anything but the first result + # // We ignore anything but the first query_result # return this.options.dataConverter.fromPayloads(0, response.queryResult?.payloads); # } def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil) @@ -584,7 +606,7 @@ def serialize_status_filter(value) sym = Temporal::Workflow::Status::API_STATUS_MAP.invert[value] status = Temporal::Api::Enums::V1::WorkflowExecutionStatus.resolve(sym) - + Temporal::Api::Filter::V1::StatusFilter.new(status: status) end end diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index e4291757..dc9d1721 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -37,9 +37,7 @@ def generate_workflow_task_metadata(task, namespace) attempt: task.attempt, workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, - workflow_name: task.workflow_type.name, - query_type: task.query&.query_type, - query_args: from_query_payloads(task.query&.query_args) + workflow_name: task.workflow_type.name ) end diff --git a/lib/temporal/metadata/workflow_task.rb b/lib/temporal/metadata/workflow_task.rb index 080a5fff..c287d2c5 100644 --- a/lib/temporal/metadata/workflow_task.rb +++ b/lib/temporal/metadata/workflow_task.rb @@ -3,9 +3,9 @@ module Temporal module Metadata class WorkflowTask < Base - attr_reader :namespace, :id, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :query_type, :query_args + attr_reader :namespace, :id, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name - def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, query_type:, query_args:) + def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:) @namespace = namespace @id = id @task_token = task_token @@ -13,8 +13,6 @@ def initialize(namespace:, id:, task_token:, attempt:, workflow_run_id:, workflo @workflow_run_id = workflow_run_id @workflow_id = workflow_id @workflow_name = workflow_name - @query_type = query_type - @query_args = query_args freeze end @@ -30,8 +28,6 @@ def to_h 'workflow_name' => workflow_name, 'workflow_id' => workflow_id, 'workflow_run_id' => workflow_run_id, - 'query_type' => query_type, - 'query_args' => query_args, 'attempt' => attempt } end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 5922cd6a..21d8b9d3 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -298,11 +298,11 @@ def on_signal(&block) end end - def on_query(query, &block) - target = History::EventTarget.workflow + def on_query(query = Dispatcher::WILDCARD, &block) + target = History::EventTarget.query - dispatcher.register_query_handler(target, query) do |input| - block.call(input) + dispatcher.register_handler(target, query) do |input| + block.call(*input) end end @@ -402,6 +402,11 @@ def schedule_command(command) state_manager.schedule(command) end + # @param query [Temporal::Api::Query::V1::WorkflowQuery] + def process_query(query) + dispatcher.dispatch(History::EventTarget.workflow, "queried:#{query.query_type}", query.query_args) + end + def call_in_fiber(block, *args) Fiber.new do Temporal::ThreadLocalContext.set(self) diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index acad0f5a..efa58f01 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -6,39 +6,36 @@ class Dispatcher def initialize @handlers = Hash.new { |hash, key| hash[key] = [] } - @query_handlers = Hash.new { |hash, key| hash[key] = {} } end def register_handler(target, event_name, &handler) handlers[target] << [event_name, handler] end - def register_query_handler(target, query, &handler) - query_handlers[target][query] = handler - end - def dispatch(target, event_name, args = nil) - handlers_for(target, event_name).each do |handler| + handlers_for(target, event_name).each do |_, handler| handler.call(*args) end end - def process_query(target, query, args) - if query == '__cadence_web_list' - return query_handlers[target].keys + %w[__stack_trace] + def process(target, event_name, args = nil) + registered_name, handler = handlers_for(target, event_name).first + unless handler.nil? + args = [args] unless args.is_a?(Array) + args.unshift(event_name) if registered_name == WILDCARD + handler.call(args) end - query_handlers[target][query].call(*args) end private - attr_reader :handlers, :query_handlers + attr_reader :handlers def handlers_for(target, event_name) handlers[target] .concat(handlers[TARGET_WILDCARD]) .select { |(name, _)| name == event_name || name == WILDCARD } - .map(&:last) + .sort_by { |(name, _)| name == WILDCARD ? 1 : 0 } end end end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 3044142a..5875debc 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -36,8 +36,9 @@ def run return state_manager.commands end - def process_query(query, args) - dispatcher.process_query(History::EventTarget.workflow, query, args) + # @param query [Temporal::Workflow::TaskProcessor::Query] + def process_query(query) + dispatcher.process(History::EventTarget.query, query.query_type, query.query_args) end private diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index d054947f..2077f433 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -16,6 +16,7 @@ class UnexpectedEventType < InternalError; end CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE = :cancel_external_workflow_request WORKFLOW_TYPE = :workflow CANCEL_WORKFLOW_REQUEST_TYPE = :cancel_workflow_request + QUERY_TYPE = :query UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE = :upsert_search_attributes_request # NOTE: The order is important, first prefix match wins (will be a longer match) @@ -44,6 +45,10 @@ def self.workflow @workflow ||= new(1, WORKFLOW_TYPE) end + def self.query + @query ||= new(1, QUERY_TYPE) + end + def self.from_event(event) _, target_type = TARGET_TYPES.find { |type, _| event.type.start_with?(type) } diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index 33e5431c..4ad5c6c0 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -7,6 +7,18 @@ module Temporal class Workflow class TaskProcessor + Query = Struct.new(:query) do + include Concerns::Payloads + + def query_type + query.query_type + end + + def query_args + from_query_payloads(query.query_args) + end + end + MAX_FAILED_ATTEMPTS = 1 def initialize(task, namespace, workflow_lookup, middleware_chain, config) @@ -38,41 +50,30 @@ def process executor.run end - if metadata.query_type - # queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{ - # TaskToken: task.TaskToken, - # Namespace: wth.namespace, - # } - # var panicErr *PanicError - # if errors.As(workflowContext.err, &panicErr) { - # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED - # queryCompletedRequest.ErrorMessage = "Workflow panic: " + panicErr.Error() - # return queryCompletedRequest - # } - - # result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.QueryArgs, task.Query.Header) - # if err != nil { - # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED - # queryCompletedRequest.ErrorMessage = err.Error() - # } else { - # queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED - # queryCompletedRequest.QueryResult = result - # } - # return queryCompletedRequest - result = executor.process_query(metadata.query_type, metadata.query_args) - connection.respond_query_task_completed( - task_token: task_token, - namespace: namespace, - completed_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, - result: result - ) + # Process deprecated style of query task + if !task.query.nil? + result = executor.process_query(Query.new(task.query)) + complete_query(result) else - complete_task(commands) + query_results = if task.queries.any? + task.queries.each_with_object({}) do |(query_id, query), hash| + begin + hash[query_id] = executor.process_query(Query.new(query)) + rescue StandardError => error + hash[query_id] = error + end + end + end + complete_task(commands, query_results) end rescue StandardError => error Temporal::ErrorHandler.handle(error, config, metadata: metadata) - fail_task(error) + if task.query.nil? + fail_task(error) + else + fail_query(error) + end ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name, namespace: namespace) @@ -117,10 +118,35 @@ def fetch_full_history Workflow::History.new(events) end - def complete_task(commands) + def complete_task(commands, query_results) Temporal.logger.info("Workflow task completed", metadata.to_h) - connection.respond_workflow_task_completed(namespace: namespace, task_token: task_token, commands: commands) + connection.respond_workflow_task_completed(namespace: namespace, task_token: task_token, commands: commands, query_results: query_results) + end + + def complete_query(result) + Temporal.logger.info("Workflow Query task completed", metadata.to_h) + + connection.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: result + ) + end + + def fail_query(error) + Temporal.logger.error("Workflow Query task failed", metadata.to_h.merge(error: error.inspect)) + Temporal.logger.debug(error.backtrace.join("\n")) + + connection.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + error_message: error.message || "Encountered an error during query handling" + ) + rescue StandardError => error + Temporal.logger.error("Unable to fail Workflow Query task", metadata.to_h.merge(error: error.inspect)) + + Temporal::ErrorHandler.handle(error, config, metadata: metadata) end def fail_task(error) diff --git a/spec/fabricators/grpc/workflow_task_fabricator.rb b/spec/fabricators/grpc/workflow_task_fabricator.rb index 33eb09ef..d1470c04 100644 --- a/spec/fabricators/grpc/workflow_task_fabricator.rb +++ b/spec/fabricators/grpc/workflow_task_fabricator.rb @@ -10,7 +10,7 @@ scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } history { |attrs| Temporal::Api::History::V1::History.new(events: attrs[:events]) } - query { Fabricate(:api_workflow_query) } + query { nil } end Fabricator(:api_paginated_workflow_task, from: :api_workflow_task) do diff --git a/spec/fabricators/workflow_task_metadata_fabricator.rb b/spec/fabricators/workflow_task_metadata_fabricator.rb index a1def3f0..d5149def 100644 --- a/spec/fabricators/workflow_task_metadata_fabricator.rb +++ b/spec/fabricators/workflow_task_metadata_fabricator.rb @@ -8,6 +8,4 @@ workflow_run_id { SecureRandom.uuid } workflow_id { SecureRandom.uuid } workflow_name 'TestWorkflow' - query_type 'state' - query_args nil end diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb similarity index 98% rename from spec/unit/lib/temporal/grpc_client_spec.rb rename to spec/unit/lib/temporal/grpc_spec.rb index 70a7885d..3e498da7 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -8,7 +8,7 @@ before do allow(subject).to receive(:client).and_return(grpc_stub) - + allow(Time).to receive(:now).and_return(now) end @@ -35,7 +35,7 @@ end end end - + describe '#signal_with_start_workflow' do let(:temporal_response) do Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx') @@ -148,7 +148,7 @@ end end - it 'demands a timeout to be specified' do + it 'demands a timeout to be specified' do expect do subject.get_workflow_execution_history( namespace: namespace, @@ -161,7 +161,7 @@ end end - it 'disallows a timeout larger than the server timeout' do + it 'disallows a timeout larger than the server timeout' do expect do subject.get_workflow_execution_history( namespace: namespace, @@ -345,4 +345,12 @@ end end end + + describe '#query_workflow' do + # TODO after discussing error handling + end + + describe '#respond_workflow_task_completed' do + # TODO testing the value transform after discussing + end end diff --git a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb index 2901830e..99f95a84 100644 --- a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb +++ b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb @@ -13,8 +13,6 @@ expect(subject.workflow_run_id).to eq(args.workflow_run_id) expect(subject.workflow_id).to eq(args.workflow_id) expect(subject.workflow_name).to eq(args.workflow_name) - expect(subject.query_type).to eq(args.query_type) - expect(subject.query_args).to eq(args.query_args) end it { is_expected.to be_frozen } @@ -34,9 +32,7 @@ 'namespace' => subject.namespace, 'workflow_id' => subject.workflow_id, 'workflow_name' => subject.workflow_name, - 'workflow_run_id' => subject.workflow_run_id, - 'query_type' => subject.query_type, - 'query_args' => subject.query_args, + 'workflow_run_id' => subject.workflow_run_id }) end end diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index ec2352ff..cd21fb76 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -42,8 +42,6 @@ expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) expect(subject.workflow_name).to eq(data.workflow_type.name) - expect(subject.query_type).to eq(data.query.query_type) - expect(subject.query_args).to eq('') end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 188b3a90..58e2f0ba 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -7,7 +7,9 @@ let(:namespace) { 'test-namespace' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } - let(:task) { Fabricate(:api_workflow_task, workflow_type: api_workflow_type) } + let(:query) { nil } + let(:queries) { nil } + let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } let(:api_workflow_type) { Fabricate(:api_workflow_type, name: workflow_name) } let(:workflow_name) { 'TestWorkflow' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -24,6 +26,7 @@ .with(config.for_connection) .and_return(connection) allow(connection).to receive(:respond_workflow_task_completed) + allow(connection).to receive(:respond_query_task_completed) allow(connection).to receive(:respond_workflow_task_failed) allow(middleware_chain).to receive(:invoke).and_call_original @@ -65,6 +68,7 @@ allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) allow(executor).to receive(:run) { workflow_class.execute_in_context(context, input); commands } + allow(executor).to receive(:process_query) end context 'when workflow task completes' do @@ -84,20 +88,71 @@ ) end - it 'completes the workflow task' do - subject.process + context 'when workflow task queries are included' do + let(:query_id) { SecureRandom.uuid } + let(:queries) do + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Query::V1::WorkflowQuery).tap do |map| + map[query_id] = Fabricate(:api_workflow_query) + end + end - expect(connection) - .to have_received(:respond_workflow_task_completed) - .with(namespace: namespace, task_token: task.task_token, commands: commands) + it 'completes the workflow task with query results' do + subject.process + + expect(executor) + .to have_received(:process_query) + .with(an_instance_of(Temporal::Workflow::TaskProcessor::Query)) + expect(connection) + .to have_received(:respond_workflow_task_completed) + .with( + task_token: task.task_token, + commands: commands, + query_results: hash_including(query_id) + ) + end end - it 'ignores connection exception' do - allow(connection) - .to receive(:respond_workflow_task_completed) - .and_raise(StandardError) + context 'when deprecated task query is present' do + let(:query) { Fabricate(:api_workflow_query) } + let(:result) { double('result') } - subject.process + before do + expect(executor).to receive(:process_query).with( + an_instance_of(Temporal::Workflow::TaskProcessor::Query) + ).and_return(result) + end + + it 'completes the workflow query task with the result' do + subject.process + + expect(connection).to_not have_received(:respond_workflow_task_completed) + expect(connection) + .to have_received(:respond_query_task_completed) + .with( + task_token: task.task_token, + namespace: namespace, + query_result: result + ) + end + end + + context 'when deprecated task query is not present' do + it 'completes the workflow task' do + subject.process + + expect(connection).to_not have_received(:respond_query_task_completed) + expect(connection) + .to have_received(:respond_workflow_task_completed) + .with(namespace: namespace, task_token: task.task_token, commands: commands, query_results: nil) + end + + it 'ignores connection exception' do + allow(connection) + .to receive(:respond_workflow_task_completed) + .and_raise(StandardError) + + subject.process + end end it 'sends queue_time metric' do @@ -122,48 +177,66 @@ before { allow(workflow_class).to receive(:execute_in_context).and_raise(exception) } - it 'fails the workflow task' do - subject.process + context 'when deprecated task query is present' do + let(:query) { Fabricate(:api_workflow_query) } - expect(connection) - .to have_received(:respond_workflow_task_failed) - .with( - namespace: namespace, - task_token: task.task_token, - cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, - exception: exception - ) + it 'completes the workflow query task with an error message' do + subject.process + + expect(connection) + .to have_received(:respond_query_task_completed) + .with( + task_token: task.task_token, + namespace: namespace, + error_message: exception.message + ) + end end - it 'does not fail the task beyond the first attempt' do - task.attempt = 2 - subject.process + context 'when deprecated task query is not present' do + it 'fails the workflow task' do + subject.process - expect(connection) - .not_to have_received(:respond_workflow_task_failed) - end + expect(connection) + .to have_received(:respond_workflow_task_failed) + .with( + namespace: namespace, + task_token: task.task_token, + cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + exception: exception + ) + end - it 'ignores connection exception' do - allow(connection) - .to receive(:respond_workflow_task_failed) - .and_raise(StandardError) + it 'does not fail the task beyond the first attempt' do + task.attempt = 2 + subject.process - subject.process - end + expect(connection) + .not_to have_received(:respond_workflow_task_failed) + end - it 'calls error_handlers' do - reported_error = nil - reported_metadata = nil + it 'ignores connection exception' do + allow(connection) + .to receive(:respond_workflow_task_failed) + .and_raise(StandardError) - config.on_error do |error, metadata: nil| - reported_error = error - reported_metadata = metadata + subject.process end - subject.process + it 'calls error_handlers' do + reported_error = nil + reported_metadata = nil - expect(reported_error).to be_an_instance_of(StandardError) - expect(reported_metadata).to be_an_instance_of(Temporal::Metadata::WorkflowTask) + config.on_error do |error, metadata: nil| + reported_error = error + reported_metadata = metadata + end + + subject.process + + expect(reported_error).to be_an_instance_of(StandardError) + expect(reported_metadata).to be_an_instance_of(Temporal::Metadata::WorkflowTask) + end end it 'sends queue_time metric' do From c2ae6a5a751ecb081159ce2f5860d19d1e23686c Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Thu, 24 Feb 2022 09:53:12 -0800 Subject: [PATCH 03/19] Fixes for on_query interface, clean up workflow and spec --- .../spec/integration/query_workflow_spec.rb | 52 ++++++++++++++----- examples/workflows/query_workflow.rb | 49 +++++++++++------ lib/temporal/workflow/context.rb | 7 ++- lib/temporal/workflow/dispatcher.rb | 2 +- 4 files changed, 76 insertions(+), 34 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index f63b2540..cd6f7f0c 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -6,21 +6,49 @@ it 'returns the correct result for the queries' do workflow_id, run_id = run_workflow(described_class) - # These query calls should leverage the task.queries mechanism, as there is a locally executed sleeping activity - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq 'started' - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq 'STARTED' - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)).to eq nil - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, 'reverse', 'upcase')).to eq nil - expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)).to eq nil + # Target query handler for "state", no args + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)) + .to eq 'started' - Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) + # Target query handler for "state", arbitrary args + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DETRATS' + + # Target catch-all query handler, no args + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) + .to be nil + + # Target catch-all query handler, arbitrary args + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, + 'reverse', 'many', 'extra', 'arguments', 'ignored')) + .to be nil + # Target catch-all query handler with unrecognized signal + # TODO this is meant to be an error handling expectation + expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)) + .to be nil + + Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) wait_for_workflow_completion(workflow_id, run_id) - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)).to eq 'finished' - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)).to eq 'finish' - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, 'reverse')).to eq 'hsinif' - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase')).to eq 'FINISHED' - expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)).to eq nil + # Repeating query scenarios above, expecting updated state and signal results + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)) + .to eq 'finished' + + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DEHSINIF' + + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) + .to eq 'finish' + + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, + 'reverse', 'many', 'extra', 'arguments', 'ignored')) + .to eq 'hsinif' + + # TODO this is meant to be an error handling expectation + expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)) + .to be nil end end diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb index 27517df2..5dfec078 100644 --- a/examples/workflows/query_workflow.rb +++ b/examples/workflows/query_workflow.rb @@ -1,36 +1,51 @@ class QueryWorkflow < Temporal::Workflow timeouts task: 10 + attr_reader :last_signal_received, :state + def execute - state = "started" - last_signal_received = nil + @state = "started" + @last_signal_received = nil # Demonstrating catch-all query handler. - workflow.on_query do |query, input| - case query - when "last_signal" - input == "reverse" ? last_signal_received&.reverse : last_signal_received - else - nil - # TODO appropriate error handling? - # raise StandardError, "Unrecognized query type '#{query}'" - end - end + workflow.on_query nil, &method(:query_catch) # Demonstrating targeted query handler. Note that this specific query handler would be invoked - # instead of the more broad catch-all query handler above. - workflow.on_query("state") do |input| - input == "upcase" ? state.upcase : state + # for "state" instead of the more broad catch-all query handler above. + workflow.on_query "state" do |*args| + apply_transforms(state, args) end - workflow.on_signal { |signal| last_signal_received = signal } + workflow.on_signal { |signal| @last_signal_received = signal } workflow.wait_for { last_signal_received == "finish" } - state = "finished" + @state = "finished" { last_signal_received: last_signal_received, final_state: state } end + + private + + def query_catch(query, *args) + case query + when "last_signal" + return last_signal_received if last_signal_received.nil? || args.empty? + apply_transforms(last_signal_received, args) + else + nil + # TODO appropriate error handling? + # raise StandardError, "Unrecognized query type '#{query}'" + end + end + + def apply_transforms(value, transforms) + return value if value.nil? || transforms.empty? + transforms.inject(value) do |memo, input| + next memo unless memo.respond_to?(input) + memo.public_send(input) + end + end end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 21d8b9d3..81bbf1af 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -298,12 +298,11 @@ def on_signal(&block) end end - def on_query(query = Dispatcher::WILDCARD, &block) + def on_query(query = nil, &block) + query ||= Dispatcher::WILDCARD target = History::EventTarget.query - dispatcher.register_handler(target, query) do |input| - block.call(*input) - end + dispatcher.register_handler(target, query, &block) end def cancel_activity(activity_id) diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index efa58f01..0e32dd21 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -23,7 +23,7 @@ def process(target, event_name, args = nil) unless handler.nil? args = [args] unless args.is_a?(Array) args.unshift(event_name) if registered_name == WILDCARD - handler.call(args) + handler.call(*args) end end From 67bb911f70e411a2a739e97a7c9b93d79e57bba1 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Thu, 24 Feb 2022 10:33:39 -0800 Subject: [PATCH 04/19] Fix method signature on testing context --- lib/temporal/testing/local_workflow_context.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index d73a5398..6f7046d3 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -190,7 +190,7 @@ def on_signal(&block) raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end - def on_query(&block) + def on_query(query = nil, &block) raise NotImplementedError, 'Queries are not available when Temporal::Testing.local! is on' end From 379d9e1947a525155edee01f46104080e5027486 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Thu, 24 Feb 2022 10:46:31 -0800 Subject: [PATCH 05/19] Move catch-all handler back to block Also adding a second targeted query handler to spec --- .../spec/integration/query_workflow_spec.rb | 7 +++ examples/workflows/query_workflow.rb | 43 +++++++++---------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index cd6f7f0c..dd52bdaa 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -15,6 +15,10 @@ 'upcase', 'ignored', 'reverse')) .to eq 'DETRATS' + # Target query handler for "signal_count", no args + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 0 + # Target catch-all query handler, no args expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) .to be nil @@ -40,6 +44,9 @@ 'upcase', 'ignored', 'reverse')) .to eq 'DEHSINIF' + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 1 + expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) .to eq 'finish' diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb index 5dfec078..534ba253 100644 --- a/examples/workflows/query_workflow.rb +++ b/examples/workflows/query_workflow.rb @@ -1,27 +1,38 @@ class QueryWorkflow < Temporal::Workflow - timeouts task: 10 - - attr_reader :last_signal_received, :state + attr_reader :state, :signal_count, :last_signal_received def execute @state = "started" + @signal_count = 0 @last_signal_received = nil # Demonstrating catch-all query handler. - workflow.on_query nil, &method(:query_catch) - - # Demonstrating targeted query handler. Note that this specific query handler would be invoked - # for "state" instead of the more broad catch-all query handler above. - workflow.on_query "state" do |*args| - apply_transforms(state, args) + workflow.on_query do |query, *args| + case query + when "last_signal" + apply_transforms(last_signal_received, args) + else + nil + # TODO appropriate error handling? + # raise StandardError, "Unrecognized query type '#{query}'" + end end - workflow.on_signal { |signal| @last_signal_received = signal } + # Demonstrating targeted query handlers. Note that these specific query handlers + # are invoked instead of the more broad catch-all query handler above. + workflow.on_query("state") { |*args| apply_transforms(state, args) } + workflow.on_query("signal_count") { signal_count } + + workflow.on_signal do |signal| + @signal_count += 1 + @last_signal_received = signal + end workflow.wait_for { last_signal_received == "finish" } @state = "finished" { + signal_count: signal_count, last_signal_received: last_signal_received, final_state: state } @@ -29,18 +40,6 @@ def execute private - def query_catch(query, *args) - case query - when "last_signal" - return last_signal_received if last_signal_received.nil? || args.empty? - apply_transforms(last_signal_received, args) - else - nil - # TODO appropriate error handling? - # raise StandardError, "Unrecognized query type '#{query}'" - end - end - def apply_transforms(value, transforms) return value if value.nil? || transforms.empty? transforms.inject(value) do |memo, input| From 116b02e7422635226dd0e34ed0723d8c3cd1dd5e Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Sun, 27 Feb 2022 13:48:31 -0800 Subject: [PATCH 06/19] Use nil workflow class in test case --- examples/spec/integration/query_workflow_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index dd52bdaa..8fe6fde7 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -6,8 +6,8 @@ it 'returns the correct result for the queries' do workflow_id, run_id = run_workflow(described_class) - # Target query handler for "state", no args - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)) + # Target query handler for "state", no args, nil workflow class + expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) .to eq 'started' # Target query handler for "state", arbitrary args @@ -37,7 +37,7 @@ wait_for_workflow_completion(workflow_id, run_id) # Repeating query scenarios above, expecting updated state and signal results - expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id)) + expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) .to eq 'finished' expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, From 6e231d7223459f425040f442f12fe97c0f7b7411 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Fri, 25 Mar 2022 09:44:00 -0700 Subject: [PATCH 07/19] Updates to remove catch all handling, add query reject handling --- .../spec/integration/query_workflow_spec.rb | 37 ++++-------- examples/workflows/query_workflow.rb | 14 ----- lib/temporal/client.rb | 6 +- lib/temporal/connection/grpc.rb | 56 ++++++++----------- .../testing/local_workflow_context.rb | 2 +- lib/temporal/workflow/context.rb | 17 ++---- lib/temporal/workflow/dispatcher.rb | 13 +---- lib/temporal/workflow/executor.rb | 10 +++- lib/temporal/workflow/history/event_target.rb | 5 -- .../temporal/workflow/task_processor_spec.rb | 1 + 10 files changed, 54 insertions(+), 107 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index 8fe6fde7..d01b4d43 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -1,4 +1,5 @@ require 'workflows/query_workflow' +require 'temporal/errors' describe QueryWorkflow, :integration do subject { described_class } @@ -6,32 +7,22 @@ it 'returns the correct result for the queries' do workflow_id, run_id = run_workflow(described_class) - # Target query handler for "state", no args, nil workflow class + # Query with nil workflow class expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) .to eq 'started' - # Target query handler for "state", arbitrary args + # Query with arbitrary args expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, 'upcase', 'ignored', 'reverse')) .to eq 'DETRATS' - # Target query handler for "signal_count", no args + # Query with no args expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) .to eq 0 - # Target catch-all query handler, no args - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) - .to be nil - - # Target catch-all query handler, arbitrary args - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, - 'reverse', 'many', 'extra', 'arguments', 'ignored')) - .to be nil - - # Target catch-all query handler with unrecognized signal - # TODO this is meant to be an error handling expectation - expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)) - .to be nil + # Query with unregistered handler + expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) wait_for_workflow_completion(workflow_id, run_id) @@ -47,15 +38,11 @@ expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) .to eq 1 - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id)) - .to eq 'finish' - - expect(Temporal.query_workflow(described_class, 'last_signal', workflow_id, run_id, - 'reverse', 'many', 'extra', 'arguments', 'ignored')) - .to eq 'hsinif' + expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') - # TODO this is meant to be an error handling expectation - expect(Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id)) - .to be nil + # Now that the workflow is completed, test a query with a reject condition satisfied + expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) } + .to raise_error(Temporal::QueryFailedFailure, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') end end diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb index 534ba253..47650ca4 100644 --- a/examples/workflows/query_workflow.rb +++ b/examples/workflows/query_workflow.rb @@ -6,20 +6,6 @@ def execute @signal_count = 0 @last_signal_received = nil - # Demonstrating catch-all query handler. - workflow.on_query do |query, *args| - case query - when "last_signal" - apply_transforms(last_signal_received, args) - else - nil - # TODO appropriate error handling? - # raise StandardError, "Unrecognized query type '#{query}'" - end - end - - # Demonstrating targeted query handlers. Note that these specific query handlers - # are invoked instead of the more broad catch-all query handler above. workflow.on_query("state") { |*args| apply_transforms(state, args) } workflow.on_query("signal_count") { signal_count } diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index a32bd4a2..6aede5a5 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -185,7 +185,8 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac # @param args [String, Array, nil] optional arguments for the query # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the # global default - def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil) + # @param query_reject_condition [Symbol] check Temporal::Connection::GRPC::QUERY_REJECT_CONDITION + def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil, query_reject_condition: nil) execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) connection.query_workflow( @@ -193,7 +194,8 @@ def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil) workflow_id: workflow_id, run_id: run_id, query: query, - args: args + args: args, + query_reject_condition: query_reject_condition ) end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index a4295f27..b530a2e6 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -26,6 +26,12 @@ class GRPC close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, }.freeze + QUERY_REJECT_CONDITION = { + none: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NONE, + not_open: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_OPEN, + not_completed_cleanly: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY + }.freeze + DEFAULT_OPTIONS = { max_page_size: 100 }.freeze @@ -485,37 +491,8 @@ def reset_sticky_task_queue raise NotImplementedError end - # async _queryWorkflowHandler(input) { - # let response; - # try { - # response = await this.service.queryWorkflow({ - # queryRejectCondition: input.queryRejectCondition, - # namespace: this.options.namespace, - # execution: input.workflowExecution, - # query: { - # queryType: input.queryType, - # queryArgs: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, - # }, - # }); - # } - # catch (err) { - # this.rethrowGrpcError(err, input.workflowExecution, 'Failed to query Workflow'); - # } - # if (response.queryRejected) { - # if (response.queryRejected.status === undefined || response.queryRejected.status === null) { - # throw new TypeError('Received queryRejected from server with no status'); - # } - # throw new QueryRejectedError(response.queryRejected.status); - # } - # if (!response.queryResult) { - # throw new TypeError('Invalid response from server'); - # } - # // We ignore anything but the first query_result - # return this.options.dataConverter.fromPayloads(0, response.queryResult?.payloads); - # } - def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil) + def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil) request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new( - # query_reject_condition: , namespace: namespace, execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, @@ -526,11 +503,24 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil) query_args: to_query_payloads(args) ) ) - response = client.query_workflow(request) - if response.query_rejected + if query_reject_condition + condition = QUERY_REJECT_CONDITION[query_reject_condition] + raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition - elsif !response.query_result + request.query_reject_condition = condition + end + begin + response = client.query_workflow(request) + rescue ::GRPC::InvalidArgument => e + raise Temporal::QueryFailedFailure, e.details + end + + if response.query_rejected + rejection_status = response.query_rejected.status || 'Received query rejection from server with no status' + raise Temporal::QueryFailedFailure.new("Query rejected: status #{rejection_status}") + elsif !response.query_result + raise Temporal::QueryFailedFailure.new('Invalid response from server') else from_query_payloads(response.query_result) end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index 6f7046d3..f0d1725a 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -190,7 +190,7 @@ def on_signal(&block) raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end - def on_query(query = nil, &block) + def on_query(query, &block) raise NotImplementedError, 'Queries are not available when Temporal::Testing.local! is on' end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 81bbf1af..bcd2aa23 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -16,11 +16,12 @@ module Temporal class Workflow class Context - attr_reader :metadata, :config + attr_reader :metadata, :config, :query_handlers def initialize(state_manager, dispatcher, workflow_class, metadata, config) @state_manager = state_manager @dispatcher = dispatcher + @query_handlers = {} @workflow_class = workflow_class @metadata = metadata @completed = false @@ -298,11 +299,8 @@ def on_signal(&block) end end - def on_query(query = nil, &block) - query ||= Dispatcher::WILDCARD - target = History::EventTarget.query - - dispatcher.register_handler(target, query, &block) + def on_query(query, &block) + query_handlers[query] = block end def cancel_activity(activity_id) @@ -337,8 +335,6 @@ def cancel(target, cancelation_id) # # @return [Future] future def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false) - options ||= {} - execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) command = Command::SignalExternalWorkflow.new( @@ -401,11 +397,6 @@ def schedule_command(command) state_manager.schedule(command) end - # @param query [Temporal::Api::Query::V1::WorkflowQuery] - def process_query(query) - dispatcher.dispatch(History::EventTarget.workflow, "queried:#{query.query_type}", query.query_args) - end - def call_in_fiber(block, *args) Fiber.new do Temporal::ThreadLocalContext.set(self) diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 0e32dd21..2a768e54 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -13,16 +13,7 @@ def register_handler(target, event_name, &handler) end def dispatch(target, event_name, args = nil) - handlers_for(target, event_name).each do |_, handler| - handler.call(*args) - end - end - - def process(target, event_name, args = nil) - registered_name, handler = handlers_for(target, event_name).first - unless handler.nil? - args = [args] unless args.is_a?(Array) - args.unshift(event_name) if registered_name == WILDCARD + handlers_for(target, event_name).each do |handler| handler.call(*args) end end @@ -35,7 +26,7 @@ def handlers_for(target, event_name) handlers[target] .concat(handlers[TARGET_WILDCARD]) .select { |(name, _)| name == event_name || name == WILDCARD } - .sort_by { |(name, _)| name == WILDCARD ? 1 : 0 } + .map(&:last) end end end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 5875debc..5499f0cb 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -38,16 +38,20 @@ def run # @param query [Temporal::Workflow::TaskProcessor::Query] def process_query(query) - dispatcher.process(History::EventTarget.query, query.query_type, query.query_args) + unless context.query_handlers.key?(query.query_type) + raise Temporal::QueryFailedFailure, "Workflow did not register a handler for #{query.query_type}" + end + + context.query_handlers[query.query_type].call(*query.query_args) end private - attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config + attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config, :context def execute_workflow(input, workflow_started_event) metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) - context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) + @context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index 2077f433..d054947f 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -16,7 +16,6 @@ class UnexpectedEventType < InternalError; end CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE = :cancel_external_workflow_request WORKFLOW_TYPE = :workflow CANCEL_WORKFLOW_REQUEST_TYPE = :cancel_workflow_request - QUERY_TYPE = :query UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE = :upsert_search_attributes_request # NOTE: The order is important, first prefix match wins (will be a longer match) @@ -45,10 +44,6 @@ def self.workflow @workflow ||= new(1, WORKFLOW_TYPE) end - def self.query - @query ||= new(1, QUERY_TYPE) - end - def self.from_event(event) _, target_type = TARGET_TYPES.find { |type, _| event.type.start_with?(type) } diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 58e2f0ba..66ee8649 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -105,6 +105,7 @@ expect(connection) .to have_received(:respond_workflow_task_completed) .with( + namespace: namespace, task_token: task.task_token, commands: commands, query_results: hash_including(query_id) From 276182af9140f5fa50248230999a567795443c56 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Fri, 25 Mar 2022 10:51:43 -0700 Subject: [PATCH 08/19] More concise when no status returned from server --- lib/temporal/connection/grpc.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index b530a2e6..15a4fe3c 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -517,7 +517,7 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_r end if response.query_rejected - rejection_status = response.query_rejected.status || 'Received query rejection from server with no status' + rejection_status = response.query_rejected.status || 'not specified by server' raise Temporal::QueryFailedFailure.new("Query rejected: status #{rejection_status}") elsif !response.query_result raise Temporal::QueryFailedFailure.new('Invalid response from server') From a32a55b6f8d1cf3027c90ffaf9e1c8b6d3145221 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Fri, 25 Mar 2022 10:54:07 -0700 Subject: [PATCH 09/19] More consistent raise message style --- lib/temporal/connection/grpc.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 15a4fe3c..17dc303e 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -518,9 +518,9 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_r if response.query_rejected rejection_status = response.query_rejected.status || 'not specified by server' - raise Temporal::QueryFailedFailure.new("Query rejected: status #{rejection_status}") + raise Temporal::QueryFailedFailure, "Query rejected: status #{rejection_status}" elsif !response.query_result - raise Temporal::QueryFailedFailure.new('Invalid response from server') + raise Temporal::QueryFailedFailure, 'Invalid response from server' else from_query_payloads(response.query_result) end From 733c3cb4edfad3ccf39b0fe7c932b901afdf4898 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Fri, 25 Mar 2022 14:33:45 -0700 Subject: [PATCH 10/19] Add test for reject condition not met --- examples/spec/integration/query_workflow_spec.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index d01b4d43..0d1ef30f 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -24,10 +24,16 @@ expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') + Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id) + + # Query for updated signal_count with an unsatisfied reject condition + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open)) + .to eq 1 + Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) wait_for_workflow_completion(workflow_id, run_id) - # Repeating query scenarios above, expecting updated state and signal results + # Repeating original query scenarios above, expecting updated state and signal results expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) .to eq 'finished' @@ -36,7 +42,7 @@ .to eq 'DEHSINIF' expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) - .to eq 1 + .to eq 2 expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') From ab789ea57158d683608dcb8a0a1c9ab85b8afcbc Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 1 Apr 2022 17:10:50 +0100 Subject: [PATCH 11/19] Simplify legacy handling and use serializers for query protos --- lib/temporal/connection/grpc.rb | 34 ++++------- lib/temporal/connection/serializer.rb | 5 ++ .../connection/serializer/query_answer.rb | 19 +++++++ .../connection/serializer/query_failure.rb | 16 ++++++ lib/temporal/workflow/executor.rb | 21 +++++-- lib/temporal/workflow/query_result.rb | 16 ++++++ lib/temporal/workflow/task_processor.rb | 57 +++++++++---------- 7 files changed, 108 insertions(+), 60 deletions(-) create mode 100644 lib/temporal/connection/serializer/query_answer.rb create mode 100644 lib/temporal/connection/serializer/query_failure.rb create mode 100644 lib/temporal/workflow/query_result.rb diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 17dc303e..5fe834e1 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -189,42 +189,28 @@ def poll_workflow_task_queue(namespace:, task_queue:) poll_request.execute end - def respond_query_task_completed(namespace:, task_token:, query_result: nil, error_message: nil) + def respond_query_task_completed(namespace:, task_token:, query_result:) + query_result_proto = Serializer.serialize(query_result) request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( task_token: task_token, - namespace: namespace + namespace: namespace, + completed_type: query_result_proto.result_type, + query_result: query_result_proto.answer, + error_message: query_result_proto.error_message, ) - if !error_message.nil? - request.error_message = error_message - request.completed_type = Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED - else - request.query_result = to_query_payloads(query_result) - request.completed_type = Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED - end + client.respond_query_task_completed(request) end - def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results:) - query_results&.transform_values! do |query_result| - if query_result.is_a?(StandardError) - { - result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED, - error_message: query_result.message - } - else - { - result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, - answer: to_query_payloads(query_result) - } - end - end + def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results: {}) request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( namespace: namespace, identity: identity, task_token: task_token, commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, - query_results: query_results || {} + query_results: query_results.transform_values { |value| Serializer.serialize(value) } ) + client.respond_workflow_task_completed(request) end diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 6343cb01..46070c66 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -1,4 +1,5 @@ require 'temporal/workflow/command' +require 'temporal/workflow/query_result' require 'temporal/connection/serializer/schedule_activity' require 'temporal/connection/serializer/start_child_workflow' require 'temporal/connection/serializer/request_activity_cancellation' @@ -10,6 +11,8 @@ require 'temporal/connection/serializer/fail_workflow' require 'temporal/connection/serializer/signal_external_workflow' require 'temporal/connection/serializer/upsert_search_attributes' +require 'temporal/connection/serializer/query_answer' +require 'temporal/connection/serializer/query_failure' module Temporal module Connection @@ -26,6 +29,8 @@ module Serializer Workflow::Command::FailWorkflow => Serializer::FailWorkflow, Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow, Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes, + Workflow::QueryResult::Answer => Serializer::QueryAnswer, + Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze def self.serialize(object) diff --git a/lib/temporal/connection/serializer/query_answer.rb b/lib/temporal/connection/serializer/query_answer.rb new file mode 100644 index 00000000..7f28ec06 --- /dev/null +++ b/lib/temporal/connection/serializer/query_answer.rb @@ -0,0 +1,19 @@ +require 'temporal/connection/serializer/base' +require 'temporal/concerns/payloads' + +module Temporal + module Connection + module Serializer + class QueryAnswer < Base + include Concerns::Payloads + + def to_proto + Temporal::Api::Query::V1::WorkflowQueryResult.new( + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, + answer: to_query_payloads(object.result) + ) + end + end + end + end +end diff --git a/lib/temporal/connection/serializer/query_failure.rb b/lib/temporal/connection/serializer/query_failure.rb new file mode 100644 index 00000000..0a2fca21 --- /dev/null +++ b/lib/temporal/connection/serializer/query_failure.rb @@ -0,0 +1,16 @@ +require 'temporal/connection/serializer/base' + +module Temporal + module Connection + module Serializer + class QueryFailure < Base + def to_proto + Temporal::Api::Query::V1::WorkflowQueryResult.new( + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED, + error_message: object.error.message + ) + end + end + end + end +end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 5499f0cb..994aba17 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -36,19 +36,28 @@ def run return state_manager.commands end - # @param query [Temporal::Workflow::TaskProcessor::Query] + # @param query [Hash] + # + # @return [Hash] + def process_queries(queries = {}) + queries.transform_values(&method(:process_query)) + end + + private + + attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config, :context + def process_query(query) unless context.query_handlers.key?(query.query_type) raise Temporal::QueryFailedFailure, "Workflow did not register a handler for #{query.query_type}" end - context.query_handlers[query.query_type].call(*query.query_args) + result = context.query_handlers[query.query_type].call(*query.query_args) + QueryResult.answer(result) + rescue StandardError => error + QueryResult.failure(error) end - private - - attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config, :context - def execute_workflow(input, workflow_started_event) metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) @context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) diff --git a/lib/temporal/workflow/query_result.rb b/lib/temporal/workflow/query_result.rb new file mode 100644 index 00000000..15364b77 --- /dev/null +++ b/lib/temporal/workflow/query_result.rb @@ -0,0 +1,16 @@ +module Temporal + class Workflow + module QueryResult + Answer = Struct.new(:result) + Failure = Struct.new(:error) + + def self.answer(result) + Answer.new(result) + end + + def self.failure(error) + Failure.new(error) + end + end + end +end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index 4ad5c6c0..4b80918a 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -20,6 +20,7 @@ def query_args end MAX_FAILED_ATTEMPTS = 1 + LEGACY_QUERY_KEY = :legacy_query def initialize(task, namespace, workflow_lookup, middleware_chain, config) @task = task @@ -50,30 +51,17 @@ def process executor.run end - # Process deprecated style of query task - if !task.query.nil? - result = executor.process_query(Query.new(task.query)) - complete_query(result) + query_results = executor.process_queries(parse_queries) + + if legacy_query_task? + complete_query(query_results[LEGACY_QUERY_KEY]) else - query_results = if task.queries.any? - task.queries.each_with_object({}) do |(query_id, query), hash| - begin - hash[query_id] = executor.process_query(Query.new(query)) - rescue StandardError => error - hash[query_id] = error - end - end - end complete_task(commands, query_results) end rescue StandardError => error Temporal::ErrorHandler.handle(error, config, metadata: metadata) - if task.query.nil? - fail_task(error) - else - fail_query(error) - end + fail_task(error) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name, namespace: namespace) @@ -118,33 +106,42 @@ def fetch_full_history Workflow::History.new(events) end - def complete_task(commands, query_results) - Temporal.logger.info("Workflow task completed", metadata.to_h) + def legacy_query_task? + !!task.query + end - connection.respond_workflow_task_completed(namespace: namespace, task_token: task_token, commands: commands, query_results: query_results) + def parse_queries + # Support for deprecated query style + if legacy_query_task? + { LEGACY_QUERY_KEY => Query.new(task.query) } + else + task.queries.each_with_object({}) do |(query_id, query), result| + result[query_id] = Query.new(query) + end + end end - def complete_query(result) - Temporal.logger.info("Workflow Query task completed", metadata.to_h) + def complete_task(commands, query_results) + Temporal.logger.info("Workflow task completed", metadata.to_h) - connection.respond_query_task_completed( + connection.respond_workflow_task_completed( namespace: namespace, task_token: task_token, - query_result: result + commands: commands, + query_results: query_results ) end - def fail_query(error) - Temporal.logger.error("Workflow Query task failed", metadata.to_h.merge(error: error.inspect)) - Temporal.logger.debug(error.backtrace.join("\n")) + def complete_query(result) + Temporal.logger.info("Workflow Query task completed", metadata.to_h) connection.respond_query_task_completed( namespace: namespace, task_token: task_token, - error_message: error.message || "Encountered an error during query handling" + query_result: result ) rescue StandardError => error - Temporal.logger.error("Unable to fail Workflow Query task", metadata.to_h.merge(error: error.inspect)) + Temporal.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect)) Temporal::ErrorHandler.handle(error, config, metadata: metadata) end From fd9e279b53573c676a1cf55eb3604b167fc9528b Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 1 Apr 2022 20:09:35 +0100 Subject: [PATCH 12/19] Add specs for the new changes --- .../serializer/query_answer_spec.rb | 23 ++++ .../serializer/query_failure_spec.rb | 19 +++ spec/unit/lib/temporal/grpc_spec.rb | 110 +++++++++++++++++- .../lib/temporal/workflow/executor_spec.rb | 35 +++++- .../temporal/workflow/task_processor_spec.rb | 55 +++++++-- 5 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb create mode 100644 spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb diff --git a/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb new file mode 100644 index 00000000..8876bbd5 --- /dev/null +++ b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb @@ -0,0 +1,23 @@ +require 'temporal/connection/serializer/query_failure' +require 'temporal/workflow/query_result' +require 'temporal/concerns/payloads' + +describe Temporal::Connection::Serializer::QueryAnswer do + class TestDeserializer + extend Temporal::Concerns::Payloads + end + + describe 'to_proto' do + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } + + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(result.result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(result.answer).to eq(TestDeserializer.to_query_payloads(42)) + end + end +end diff --git a/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb new file mode 100644 index 00000000..7e948f4d --- /dev/null +++ b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb @@ -0,0 +1,19 @@ +require 'temporal/connection/serializer/query_failure' +require 'temporal/workflow/query_result' + +describe Temporal::Connection::Serializer::QueryFailure do + describe 'to_proto' do + let(:exception) { StandardError.new('Test query failure') } + let(:query_result) { Temporal::Workflow::QueryResult.failure(exception) } + + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(result.result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(result.error_message).to eq('Test query failure') + end + end +end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index 3e498da7..8cbe5af7 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -1,3 +1,6 @@ +require 'temporal/connection/grpc' +require 'temporal/workflow/query_result' + describe Temporal::Connection::GRPC do subject { Temporal::Connection::GRPC.new(nil, nil, nil) } let(:grpc_stub) { double('grpc stub') } @@ -6,6 +9,10 @@ let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} + class TestDeserializer + extend Temporal::Concerns::Payloads + end + before do allow(subject).to receive(:client).and_return(grpc_stub) @@ -346,11 +353,108 @@ end end - describe '#query_workflow' do - # TODO after discussing error handling + describe '#respond_query_task_completed' do + let(:task_token) { SecureRandom.uuid } + + before do + allow(grpc_stub) + .to receive(:respond_query_task_completed) + .and_return(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedResponse.new) + end + + context 'when query result is an answer' do + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } + + it 'makes an API request' do + subject.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: query_result + ) + + expect(grpc_stub).to have_received(:respond_query_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.completed_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(request.query_result).to eq(TestDeserializer.to_query_payloads(42)) + expect(request.error_message).to eq('') + end + end + end + + context 'when query result is a failure' do + let(:query_result) { Temporal::Workflow::QueryResult.failure(StandardError.new('Test query failure')) } + + it 'makes an API request' do + subject.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: query_result + ) + + expect(grpc_stub).to have_received(:respond_query_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.completed_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(request.query_result).to eq(nil) + expect(request.error_message).to eq('Test query failure') + end + end + end end describe '#respond_workflow_task_completed' do - # TODO testing the value transform after discussing + let(:task_token) { SecureRandom.uuid } + + before do + allow(grpc_stub) + .to receive(:respond_workflow_task_completed) + .and_return(Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedResponse.new) + end + + context 'when responding with query results' do + let(:query_results) do + { + '1' => Temporal::Workflow::QueryResult.answer(42), + '2' => Temporal::Workflow::QueryResult.failure(StandardError.new('Test query failure')), + } + end + + it 'makes an API request' do + subject.respond_workflow_task_completed( + namespace: namespace, + task_token: task_token, + commands: [], + query_results: query_results + ) + + expect(grpc_stub).to have_received(:respond_workflow_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.commands).to be_empty + + expect(request.query_results.length).to eq(2) + + expect(request.query_results['1']).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(request.query_results['1'].result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(request.query_results['1'].answer).to eq(TestDeserializer.to_query_payloads(42)) + + expect(request.query_results['2']).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(request.query_results['2'].result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(request.query_results['2'].error_message).to eq('Test query failure') + end + end + end end end diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index a74c3387..5d1d3fc5 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -1,6 +1,7 @@ require 'temporal/workflow/executor' require 'temporal/workflow/history' require 'temporal/workflow' +require 'temporal/workflow/task_processor' describe Temporal::Workflow::Executor do subject { described_class.new(workflow, history, workflow_metadata, config) } @@ -78,4 +79,36 @@ def execute ) end end -end \ No newline at end of file + + describe '#process_queries' do + let(:context) { subject.send(:context) } + let(:query_1_result) { 42 } + let(:query_2_error) { StandardError.new('Test query failure') } + let(:queries) do + { + '1' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), + '2' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), + '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')), + } + end + + before do + subject.run + context.on_query('success') { query_1_result } + context.on_query('failure') { raise query_2_error } + end + + it 'returns query results' do + results = subject.process_queries(queries) + + expect(results.length).to eq(3) + expect(results['1']).to be_a(Temporal::Workflow::QueryResult::Answer) + expect(results['1'].result).to eq(query_1_result) + expect(results['2']).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(results['2'].error).to eq(query_2_error) + expect(results['3']).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(results['3'].error).to be_a(Temporal::QueryFailedFailure) + expect(results['3'].error.message).to eq('Workflow did not register a handler for unknown') + end + end +end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 66ee8649..5a02569e 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -68,7 +68,7 @@ allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) allow(executor).to receive(:run) { workflow_class.execute_in_context(context, input); commands } - allow(executor).to receive(:process_query) + allow(executor).to receive(:process_queries) end context 'when workflow task completes' do @@ -90,42 +90,48 @@ context 'when workflow task queries are included' do let(:query_id) { SecureRandom.uuid } + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } let(:queries) do Google::Protobuf::Map.new(:string, :message, Temporal::Api::Query::V1::WorkflowQuery).tap do |map| map[query_id] = Fabricate(:api_workflow_query) end end + before do + allow(executor).to receive(:process_queries).and_return(query_id => query_result) + end + it 'completes the workflow task with query results' do subject.process expect(executor) - .to have_received(:process_query) - .with(an_instance_of(Temporal::Workflow::TaskProcessor::Query)) + .to have_received(:process_queries) + .with(query_id => an_instance_of(Temporal::Workflow::TaskProcessor::Query)) expect(connection) .to have_received(:respond_workflow_task_completed) .with( namespace: namespace, task_token: task.task_token, commands: commands, - query_results: hash_including(query_id) + query_results: { query_id => query_result } ) end end context 'when deprecated task query is present' do let(:query) { Fabricate(:api_workflow_query) } - let(:result) { double('result') } + let(:result) { Temporal::Workflow::QueryResult.answer(42) } before do - expect(executor).to receive(:process_query).with( - an_instance_of(Temporal::Workflow::TaskProcessor::Query) - ).and_return(result) + allow(executor).to receive(:process_queries).and_return(legacy_query: result) end it 'completes the workflow query task with the result' do subject.process + expect(executor).to have_received(:process_queries).with( + legacy_query: an_instance_of(Temporal::Workflow::TaskProcessor::Query) + ) expect(connection).to_not have_received(:respond_workflow_task_completed) expect(connection) .to have_received(:respond_query_task_completed) @@ -181,15 +187,16 @@ context 'when deprecated task query is present' do let(:query) { Fabricate(:api_workflow_query) } - it 'completes the workflow query task with an error message' do + it 'fails the workflow task' do subject.process expect(connection) - .to have_received(:respond_query_task_completed) + .to have_received(:respond_workflow_task_failed) .with( - task_token: task.task_token, namespace: namespace, - error_message: exception.message + task_token: task.task_token, + cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + exception: exception ) end end @@ -257,6 +264,30 @@ end end + context 'when legacy query fails' do + let(:query) { Fabricate(:api_workflow_query) } + let(:exception) { StandardError.new('workflow task failed') } + let(:query_failure) { Temporal::Workflow::QueryResult.failure(exception) } + + before do + allow(executor) + .to receive(:process_queries) + .and_return(legacy_query: query_failure) + end + + it 'fails the workflow task' do + subject.process + + expect(connection) + .to have_received(:respond_query_task_completed) + .with( + namespace: namespace, + task_token: task.task_token, + query_result: query_failure + ) + end + end + context 'when history is paginated' do let(:task) { Fabricate(:api_paginated_workflow_task, workflow_type: api_workflow_type) } let(:event) { Fabricate(:api_workflow_execution_started_event) } From c775586d66b694ca651de5f7d9821414e7432505 Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 1 Apr 2022 20:13:19 +0100 Subject: [PATCH 13/19] Test query result & freeze them --- lib/temporal/workflow/query_result.rb | 4 +-- .../temporal/workflow/query_result_spec.rb | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 spec/unit/lib/temporal/workflow/query_result_spec.rb diff --git a/lib/temporal/workflow/query_result.rb b/lib/temporal/workflow/query_result.rb index 15364b77..a4d0401e 100644 --- a/lib/temporal/workflow/query_result.rb +++ b/lib/temporal/workflow/query_result.rb @@ -5,11 +5,11 @@ module QueryResult Failure = Struct.new(:error) def self.answer(result) - Answer.new(result) + Answer.new(result).freeze end def self.failure(error) - Failure.new(error) + Failure.new(error).freeze end end end diff --git a/spec/unit/lib/temporal/workflow/query_result_spec.rb b/spec/unit/lib/temporal/workflow/query_result_spec.rb new file mode 100644 index 00000000..222446a8 --- /dev/null +++ b/spec/unit/lib/temporal/workflow/query_result_spec.rb @@ -0,0 +1,25 @@ +require 'temporal/workflow/query_result' + +describe Temporal::Workflow::QueryResult do + describe '.answer' do + it 'returns an anwer query result' do + result = described_class.answer(42) + + expect(result).to be_a(Temporal::Workflow::QueryResult::Answer) + expect(result).to be_frozen + expect(result.result).to eq(42) + end + end + + describe '.failure' do + let(:error) { StandardError.new('Test query failure') } + + it 'returns a failure query result' do + result = described_class.failure(error) + + expect(result).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(result).to be_frozen + expect(result.error).to eq(error) + end + end +end From 54bc7a1c55781b8b9c55c51954f1e5ee0e992dc2 Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 11 Apr 2022 21:13:16 +0100 Subject: [PATCH 14/19] Implement QueryRegistry --- lib/temporal/workflow/query_registry.rb | 33 +++++++++ .../temporal/workflow/query_registry_spec.rb | 67 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 lib/temporal/workflow/query_registry.rb create mode 100644 spec/unit/lib/temporal/workflow/query_registry_spec.rb diff --git a/lib/temporal/workflow/query_registry.rb b/lib/temporal/workflow/query_registry.rb new file mode 100644 index 00000000..a1754aa8 --- /dev/null +++ b/lib/temporal/workflow/query_registry.rb @@ -0,0 +1,33 @@ +require 'temporal/errors' + +module Temporal + class Workflow + class QueryRegistry + def initialize + @handlers = {} + end + + def register(type, &handler) + if handlers.key?(type) + warn "[NOTICE] Overwriting a query handler for #{type}" + end + + handlers[type] = handler + end + + def handle(type, args = nil) + handler = handlers[type] + + unless handler + raise Temporal::QueryFailedFailure, "Workflow did not register a handler for #{type}" + end + + handler.call(*args) + end + + private + + attr_reader :handlers + end + end +end diff --git a/spec/unit/lib/temporal/workflow/query_registry_spec.rb b/spec/unit/lib/temporal/workflow/query_registry_spec.rb new file mode 100644 index 00000000..5af6f61c --- /dev/null +++ b/spec/unit/lib/temporal/workflow/query_registry_spec.rb @@ -0,0 +1,67 @@ +require 'temporal/workflow/query_registry' + +describe Temporal::Workflow::QueryRegistry do + subject { described_class.new } + + describe '#register' do + let(:handler) { Proc.new {} } + + it 'registers a query handler' do + subject.register('test-query', &handler) + + expect(subject.send(:handlers)['test-query']).to eq(handler) + end + + context 'when query handler is already registered' do + let(:handler_2) { Proc.new {} } + + before { subject.register('test-query', &handler) } + + it 'warns' do + allow(subject).to receive(:warn) + + subject.register('test-query', &handler_2) + + expect(subject) + .to have_received(:warn) + .with('[NOTICE] Overwriting a query handler for test-query') + end + + it 're-registers a query handler' do + subject.register('test-query', &handler_2) + + expect(subject.send(:handlers)['test-query']).to eq(handler_2) + end + end + end + + describe '#handle' do + context 'when a query handler has been registered' do + let(:handler) { Proc.new { 42 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query')).to eq(42) + end + end + + context 'when a query handler has been registered with args' do + let(:handler) { Proc.new { |arg_1, arg_2| arg_1 + arg_2 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query', [3, 5])).to eq(8) + end + end + + context 'when a query handler has not been registered' do + it 'raises' do + expect do + subject.handle('test-query') + end.to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for test-query') + end + end + end +end From 59e6700edd7af847cca42e13c8be2346f463c204 Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 11 Apr 2022 21:18:38 +0100 Subject: [PATCH 15/19] Swap Context#query_handlers with QueryRegistry --- lib/temporal/workflow/context.rb | 10 +++++----- lib/temporal/workflow/executor.rb | 16 ++++++++++------ spec/unit/lib/temporal/workflow/context_spec.rb | 14 +++++++++++--- spec/unit/lib/temporal/workflow/executor_spec.rb | 9 +++++---- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index bcd2aa23..e7906dc7 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -16,12 +16,12 @@ module Temporal class Workflow class Context - attr_reader :metadata, :config, :query_handlers + attr_reader :metadata, :config - def initialize(state_manager, dispatcher, workflow_class, metadata, config) + def initialize(state_manager, dispatcher, workflow_class, metadata, config, query_registry) @state_manager = state_manager @dispatcher = dispatcher - @query_handlers = {} + @query_registry = query_registry @workflow_class = workflow_class @metadata = metadata @completed = false @@ -300,7 +300,7 @@ def on_signal(&block) end def on_query(query, &block) - query_handlers[query] = block + query_registry.register(query, &block) end def cancel_activity(activity_id) @@ -387,7 +387,7 @@ def upsert_search_attributes(search_attributes) private - attr_reader :state_manager, :dispatcher, :workflow_class + attr_reader :state_manager, :dispatcher, :workflow_class, :query_registry def completed! @completed = true diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 994aba17..e835cab7 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -1,6 +1,7 @@ require 'fiber' require 'temporal/workflow/dispatcher' +require 'temporal/workflow/query_registry' require 'temporal/workflow/state_manager' require 'temporal/workflow/context' require 'temporal/workflow/history/event_target' @@ -16,6 +17,7 @@ class Executor def initialize(workflow_class, history, task_metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new + @query_registry = QueryRegistry.new @state_manager = StateManager.new(dispatcher) @history = history @task_metadata = task_metadata @@ -36,6 +38,11 @@ def run return state_manager.commands end + # Process queries using the pre-registered query handlers + # + # @note this method is expected to be executed after the history has + # been fully replayed (by invoking the #run method) + # # @param query [Hash] # # @return [Hash] @@ -45,14 +52,11 @@ def process_queries(queries = {}) private - attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config, :context + attr_reader :workflow_class, :dispatcher, :query_registry, :state_manager, :task_metadata, :history, :config def process_query(query) - unless context.query_handlers.key?(query.query_type) - raise Temporal::QueryFailedFailure, "Workflow did not register a handler for #{query.query_type}" - end + result = query_registry.handle(query.query_type, query.query_args) - result = context.query_handlers[query.query_type].call(*query.query_args) QueryResult.answer(result) rescue StandardError => error QueryResult.failure(error) @@ -60,7 +64,7 @@ def process_query(query) def execute_workflow(input, workflow_started_event) metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) - @context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) + context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/spec/unit/lib/temporal/workflow/context_spec.rb b/spec/unit/lib/temporal/workflow/context_spec.rb index 24768d82..9a49cb97 100644 --- a/spec/unit/lib/temporal/workflow/context_spec.rb +++ b/spec/unit/lib/temporal/workflow/context_spec.rb @@ -7,10 +7,18 @@ class MyTestWorkflow < Temporal::Workflow; end describe Temporal::Workflow::Context do let(:state_manager) { instance_double('Temporal::Workflow::StateManager') } let(:dispatcher) { instance_double('Temporal::Workflow::Dispatcher') } + let(:query_registry) { instance_double('Temporal::Workflow::QueryRegistry') } let(:metadata) { instance_double('Temporal::Metadata::Workflow') } - let(:workflow_context) { - Temporal::Workflow::Context.new(state_manager, dispatcher, MyTestWorkflow, metadata, Temporal.configuration) - } + let(:workflow_context) do + Temporal::Workflow::Context.new( + state_manager, + dispatcher, + MyTestWorkflow, + metadata, + Temporal.configuration, + query_registry + ) + end describe '#upsert_search_attributes' do it 'does not accept nil' do diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index 5d1d3fc5..af6862fa 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -2,6 +2,7 @@ require 'temporal/workflow/history' require 'temporal/workflow' require 'temporal/workflow/task_processor' +require 'temporal/workflow/query_registry' describe Temporal::Workflow::Executor do subject { described_class.new(workflow, history, workflow_metadata, config) } @@ -81,7 +82,7 @@ def execute end describe '#process_queries' do - let(:context) { subject.send(:context) } + let(:query_registry) { Temporal::Workflow::QueryRegistry.new } let(:query_1_result) { 42 } let(:query_2_error) { StandardError.new('Test query failure') } let(:queries) do @@ -93,9 +94,9 @@ def execute end before do - subject.run - context.on_query('success') { query_1_result } - context.on_query('failure') { raise query_2_error } + allow(Temporal::Workflow::QueryRegistry).to receive(:new).and_return(query_registry) + query_registry.register('success') { query_1_result } + query_registry.register('failure') { raise query_2_error } end it 'returns query results' do From c6d00ff9feb2cd1e5425308aa28cb35001276e76 Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 11 Apr 2022 21:28:14 +0100 Subject: [PATCH 16/19] Add a spec for Workflow::Context --- spec/unit/lib/temporal/workflow/context_spec.rb | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spec/unit/lib/temporal/workflow/context_spec.rb b/spec/unit/lib/temporal/workflow/context_spec.rb index 9a49cb97..a6dd2921 100644 --- a/spec/unit/lib/temporal/workflow/context_spec.rb +++ b/spec/unit/lib/temporal/workflow/context_spec.rb @@ -20,6 +20,20 @@ class MyTestWorkflow < Temporal::Workflow; end ) end + describe '#on_query' do + let(:handler) { Proc.new {} } + + before { allow(query_registry).to receive(:register) } + + it 'registers a query with the query registry' do + workflow_context.on_query('test-query', &handler) + + expect(query_registry).to have_received(:register).with('test-query') do |&block| + expect(block).to eq(handler) + end + end + end + describe '#upsert_search_attributes' do it 'does not accept nil' do expect do From 4ba9b413d13df68af55c6f1212260905a70da67e Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 11 Apr 2022 21:36:07 +0100 Subject: [PATCH 17/19] Rename QueryFailedFailure error to QueryFailed --- examples/spec/integration/query_workflow_spec.rb | 6 +++--- lib/temporal/connection/grpc.rb | 6 +++--- lib/temporal/errors.rb | 2 +- lib/temporal/workflow/query_registry.rb | 2 +- rbi/temporal-ruby.rbi | 2 +- spec/unit/lib/temporal/workflow/executor_spec.rb | 2 +- spec/unit/lib/temporal/workflow/query_registry_spec.rb | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index 0d1ef30f..93812635 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -22,7 +22,7 @@ # Query with unregistered handler expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } - .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') + .to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query') Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id) @@ -45,10 +45,10 @@ .to eq 2 expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } - .to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for unknown_query') + .to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query') # Now that the workflow is completed, test a query with a reject condition satisfied expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) } - .to raise_error(Temporal::QueryFailedFailure, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') + .to raise_error(Temporal::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 5fe834e1..747c13a8 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -499,14 +499,14 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_r begin response = client.query_workflow(request) rescue ::GRPC::InvalidArgument => e - raise Temporal::QueryFailedFailure, e.details + raise Temporal::QueryFailed, e.details end if response.query_rejected rejection_status = response.query_rejected.status || 'not specified by server' - raise Temporal::QueryFailedFailure, "Query rejected: status #{rejection_status}" + raise Temporal::QueryFailed, "Query rejected: status #{rejection_status}" elsif !response.query_result - raise Temporal::QueryFailedFailure, 'Invalid response from server' + raise Temporal::QueryFailed, 'Invalid response from server' else from_query_payloads(response.query_result) end diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 5047cc90..6d49bef2 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -67,7 +67,7 @@ class ClientVersionNotSupportedFailure < ApiError; end class FeatureVersionNotSupportedFailure < ApiError; end class NamespaceAlreadyExistsFailure < ApiError; end class CancellationAlreadyRequestedFailure < ApiError; end - class QueryFailedFailure < ApiError; end + class QueryFailed < ApiError; end class UnexpectedResponse < ApiError; end end diff --git a/lib/temporal/workflow/query_registry.rb b/lib/temporal/workflow/query_registry.rb index a1754aa8..babdda66 100644 --- a/lib/temporal/workflow/query_registry.rb +++ b/lib/temporal/workflow/query_registry.rb @@ -19,7 +19,7 @@ def handle(type, args = nil) handler = handlers[type] unless handler - raise Temporal::QueryFailedFailure, "Workflow did not register a handler for #{type}" + raise Temporal::QueryFailed, "Workflow did not register a handler for #{type}" end handler.call(*args) diff --git a/rbi/temporal-ruby.rbi b/rbi/temporal-ruby.rbi index b7da9d12..cdcda078 100644 --- a/rbi/temporal-ruby.rbi +++ b/rbi/temporal-ruby.rbi @@ -39,5 +39,5 @@ module Temporal class FeatureVersionNotSupportedFailure; end class NamespaceAlreadyExistsFailure; end class CancellationAlreadyRequestedFailure; end - class QueryFailedFailure; end + class QueryFailed; end end diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index af6862fa..d35a9f18 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -108,7 +108,7 @@ def execute expect(results['2']).to be_a(Temporal::Workflow::QueryResult::Failure) expect(results['2'].error).to eq(query_2_error) expect(results['3']).to be_a(Temporal::Workflow::QueryResult::Failure) - expect(results['3'].error).to be_a(Temporal::QueryFailedFailure) + expect(results['3'].error).to be_a(Temporal::QueryFailed) expect(results['3'].error.message).to eq('Workflow did not register a handler for unknown') end end diff --git a/spec/unit/lib/temporal/workflow/query_registry_spec.rb b/spec/unit/lib/temporal/workflow/query_registry_spec.rb index 5af6f61c..3c5ced14 100644 --- a/spec/unit/lib/temporal/workflow/query_registry_spec.rb +++ b/spec/unit/lib/temporal/workflow/query_registry_spec.rb @@ -60,7 +60,7 @@ it 'raises' do expect do subject.handle('test-query') - end.to raise_error(Temporal::QueryFailedFailure, 'Workflow did not register a handler for test-query') + end.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for test-query') end end end From 096e0875a741bea23f2041522702315c7e6c68c1 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Tue, 12 Apr 2022 08:20:46 -0700 Subject: [PATCH 18/19] Small cleanup items --- lib/temporal/testing/local_workflow_context.rb | 2 +- lib/temporal/workflow/context.rb | 2 +- lib/temporal/workflow/executor.rb | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index f0d1725a..1f200e84 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -186,7 +186,7 @@ def now Time.now end - def on_signal(&block) + def on_signal(signal_name = nil, &block) raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 942bc4e8..74722f00 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -299,7 +299,7 @@ def now # all signals that do not match a named signal handler. # # @param signal_name [String, Symbol, nil] an optional signal name; converted to a String - def on_signal(signal_name=nil, &block) + def on_signal(signal_name = nil, &block) if signal_name target = Signal.new(signal_name) dispatcher.register_handler(target, 'signaled') do |_, input| diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index e835cab7..546fe7f6 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -43,7 +43,7 @@ def run # @note this method is expected to be executed after the history has # been fully replayed (by invoking the #run method) # - # @param query [Hash] + # @param queries [Hash] # # @return [Hash] def process_queries(queries = {}) From ecea8f2053e222d9c6cb59b3e100c820238474e1 Mon Sep 17 00:00:00 2001 From: Dave Willett Date: Tue, 12 Apr 2022 09:08:32 -0700 Subject: [PATCH 19/19] Update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 13e6ef56..838fda7a 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ Besides calling activities workflows can: - Use timers - Receive signals - Execute other (child) workflows -- Respond to queries [not yet implemented] +- Respond to queries ## Activities