diff --git a/README.md b/README.md index 13e6ef56..838fda7a 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ Besides calling activities workflows can: - Use timers - Receive signals - Execute other (child) workflows -- Respond to queries [not yet implemented] +- Respond to queries ## Activities diff --git a/examples/bin/query b/examples/bin/query new file mode 100755 index 00000000..c0e7f719 --- /dev/null +++ b/examples/bin/query @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +require_relative '../init' + +Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f } + +workflow_class_name, workflow_id, run_id, query, args = ARGV +workflow_class = Object.const_get(workflow_class_name) + +if ![workflow_class, workflow_id, run_id, query].all? + fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`' +end + +result = Temporal.query_workflow(workflow_class, query, workflow_id, run_id, args) +puts result.inspect diff --git a/examples/bin/worker b/examples/bin/worker index aa56bf0c..c9c78145 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -36,6 +36,7 @@ worker.register_workflow(MetadataWorkflow) worker.register_workflow(ParentCloseWorkflow) worker.register_workflow(ParentWorkflow) worker.register_workflow(ProcessFileWorkflow) +worker.register_workflow(QueryWorkflow) worker.register_workflow(QuickTimeoutWorkflow) worker.register_workflow(RandomlyFailingWorkflow) worker.register_workflow(ReleaseWorkflow) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb new file mode 100644 index 00000000..93812635 --- /dev/null +++ b/examples/spec/integration/query_workflow_spec.rb @@ -0,0 +1,54 @@ +require 'workflows/query_workflow' +require 'temporal/errors' + +describe QueryWorkflow, :integration do + subject { described_class } + + it 'returns the correct result for the queries' do + workflow_id, run_id = run_workflow(described_class) + + # Query with nil workflow class + expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'started' + + # Query with arbitrary args + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DETRATS' + + # Query with no args + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 0 + + # Query with unregistered handler + expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query') + + Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id) + + # Query for updated signal_count with an unsatisfied reject condition + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open)) + .to eq 1 + + Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id) + wait_for_workflow_completion(workflow_id, run_id) + + # Repeating original query scenarios above, expecting updated state and signal results + expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'finished' + + expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DEHSINIF' + + expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 2 + + expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query') + + # Now that the workflow is completed, test a query with a reject condition satisfied + expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) } + .to raise_error(Temporal::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') + end +end diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb new file mode 100644 index 00000000..47650ca4 --- /dev/null +++ b/examples/workflows/query_workflow.rb @@ -0,0 +1,36 @@ +class QueryWorkflow < Temporal::Workflow + attr_reader :state, :signal_count, :last_signal_received + + def execute + @state = "started" + @signal_count = 0 + @last_signal_received = nil + + workflow.on_query("state") { |*args| apply_transforms(state, args) } + workflow.on_query("signal_count") { signal_count } + + workflow.on_signal do |signal| + @signal_count += 1 + @last_signal_received = signal + end + + workflow.wait_for { last_signal_received == "finish" } + @state = "finished" + + { + signal_count: signal_count, + last_signal_received: last_signal_received, + final_state: state + } + end + + private + + def apply_transforms(value, transforms) + return value if value.nil? || transforms.empty? + transforms.inject(value) do |memo, input| + next memo unless memo.respond_to?(input) + memo.public_send(input) + end + end +end diff --git a/lib/temporal.rb b/lib/temporal.rb index 0b95a882..7f62be78 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -20,6 +20,7 @@ module Temporal :describe_namespace, :list_namespaces, :signal_workflow, + :query_workflow, :await_workflow_result, :reset_workflow, :terminate_workflow, @@ -48,11 +49,11 @@ def metrics end private - + def default_client @default_client ||= Client.new(config) end - + def config @config ||= Configuration.new end diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index c1e9a2da..6aede5a5 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -176,6 +176,29 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac ) end + # Issue a query against a running workflow + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param query [String] name of the query to issue + # @param workflow_id [String] + # @param run_id [String] + # @param args [String, Array, nil] optional arguments for the query + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default + # @param query_reject_condition [Symbol] check Temporal::Connection::GRPC::QUERY_REJECT_CONDITION + def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil, query_reject_condition: nil) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) + + connection.query_workflow( + namespace: namespace || execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id, + query: query, + args: args, + query_reject_condition: query_reject_condition + ) + end + # Long polls for a workflow to be completed and returns workflow's return value. # # @note This function times out after 30 seconds and throws Temporal::TimeoutError, @@ -207,7 +230,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam timeout: timeout || max_timeout, ) rescue GRPC::DeadlineExceeded => e - message = if timeout + message = if timeout "Timed out after your specified limit of timeout: #{timeout} seconds" else "Timed out after #{max_timeout} seconds, which is the maximum supported amount." diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb index 3be276d2..ad703542 100644 --- a/lib/temporal/concerns/payloads.rb +++ b/lib/temporal/concerns/payloads.rb @@ -21,6 +21,10 @@ def from_signal_payloads(payloads) from_payloads(payloads)&.first end + def from_query_payloads(payloads) + from_payloads(payloads)&.first + end + def from_payload_map(payload_map) payload_map.map { |key, value| [key, from_payload(value)] }.to_h end @@ -45,6 +49,10 @@ def to_signal_payloads(data) to_payloads([data]) end + def to_query_payloads(data) + to_payloads([data]) + end + def to_payload_map(data) data.transform_values(&method(:to_payload)) end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 9e27a10e..747c13a8 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -26,6 +26,12 @@ class GRPC close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, }.freeze + QUERY_REJECT_CONDITION = { + none: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NONE, + not_open: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_OPEN, + not_completed_cleanly: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY + }.freeze + DEFAULT_OPTIONS = { max_page_size: 100 }.freeze @@ -142,7 +148,7 @@ def get_workflow_execution_history( event_type: :all, timeout: nil ) - if wait_for_new_event + if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. raise "You must specify a timeout when wait_for_new_event = true." @@ -183,13 +189,28 @@ def poll_workflow_task_queue(namespace:, task_queue:) poll_request.execute end - def respond_workflow_task_completed(namespace:, task_token:, commands:) + def respond_query_task_completed(namespace:, task_token:, query_result:) + query_result_proto = Serializer.serialize(query_result) + request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( + task_token: task_token, + namespace: namespace, + completed_type: query_result_proto.result_type, + query_result: query_result_proto.answer, + error_message: query_result_proto.error_message, + ) + + client.respond_query_task_completed(request) + end + + def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results: {}) request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( namespace: namespace, identity: identity, task_token: task_token, - commands: Array(commands).map { |(_, command)| Serializer.serialize(command) } + commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, + query_results: query_results.transform_values { |value| Serializer.serialize(value) } ) + client.respond_workflow_task_completed(request) end @@ -452,16 +473,43 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed - raise NotImplementedError - end - def reset_sticky_task_queue raise NotImplementedError end - def query_workflow - raise NotImplementedError + def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil) + request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new( + namespace: namespace, + execution: Temporal::Api::Common::V1::WorkflowExecution.new( + workflow_id: workflow_id, + run_id: run_id + ), + query: Temporal::Api::Query::V1::WorkflowQuery.new( + query_type: query, + query_args: to_query_payloads(args) + ) + ) + if query_reject_condition + condition = QUERY_REJECT_CONDITION[query_reject_condition] + raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition + + request.query_reject_condition = condition + end + + begin + response = client.query_workflow(request) + rescue ::GRPC::InvalidArgument => e + raise Temporal::QueryFailed, e.details + end + + if response.query_rejected + rejection_status = response.query_rejected.status || 'not specified by server' + raise Temporal::QueryFailed, "Query rejected: status #{rejection_status}" + elsif !response.query_result + raise Temporal::QueryFailed, 'Invalid response from server' + else + from_query_payloads(response.query_result) + end end def describe_workflow_execution(namespace:, workflow_id:, run_id:) @@ -534,7 +582,7 @@ def serialize_status_filter(value) sym = Temporal::Workflow::Status::API_STATUS_MAP.invert[value] status = Temporal::Api::Enums::V1::WorkflowExecutionStatus.resolve(sym) - + Temporal::Api::Filter::V1::StatusFilter.new(status: status) end end diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 6343cb01..46070c66 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -1,4 +1,5 @@ require 'temporal/workflow/command' +require 'temporal/workflow/query_result' require 'temporal/connection/serializer/schedule_activity' require 'temporal/connection/serializer/start_child_workflow' require 'temporal/connection/serializer/request_activity_cancellation' @@ -10,6 +11,8 @@ require 'temporal/connection/serializer/fail_workflow' require 'temporal/connection/serializer/signal_external_workflow' require 'temporal/connection/serializer/upsert_search_attributes' +require 'temporal/connection/serializer/query_answer' +require 'temporal/connection/serializer/query_failure' module Temporal module Connection @@ -26,6 +29,8 @@ module Serializer Workflow::Command::FailWorkflow => Serializer::FailWorkflow, Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow, Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes, + Workflow::QueryResult::Answer => Serializer::QueryAnswer, + Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze def self.serialize(object) diff --git a/lib/temporal/connection/serializer/query_answer.rb b/lib/temporal/connection/serializer/query_answer.rb new file mode 100644 index 00000000..7f28ec06 --- /dev/null +++ b/lib/temporal/connection/serializer/query_answer.rb @@ -0,0 +1,19 @@ +require 'temporal/connection/serializer/base' +require 'temporal/concerns/payloads' + +module Temporal + module Connection + module Serializer + class QueryAnswer < Base + include Concerns::Payloads + + def to_proto + Temporal::Api::Query::V1::WorkflowQueryResult.new( + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, + answer: to_query_payloads(object.result) + ) + end + end + end + end +end diff --git a/lib/temporal/connection/serializer/query_failure.rb b/lib/temporal/connection/serializer/query_failure.rb new file mode 100644 index 00000000..0a2fca21 --- /dev/null +++ b/lib/temporal/connection/serializer/query_failure.rb @@ -0,0 +1,16 @@ +require 'temporal/connection/serializer/base' + +module Temporal + module Connection + module Serializer + class QueryFailure < Base + def to_proto + Temporal::Api::Query::V1::WorkflowQueryResult.new( + result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED, + error_message: object.error.message + ) + end + end + end + end +end diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 5047cc90..6d49bef2 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -67,7 +67,7 @@ class ClientVersionNotSupportedFailure < ApiError; end class FeatureVersionNotSupportedFailure < ApiError; end class NamespaceAlreadyExistsFailure < ApiError; end class CancellationAlreadyRequestedFailure < ApiError; end - class QueryFailedFailure < ApiError; end + class QueryFailed < ApiError; end class UnexpectedResponse < ApiError; end end diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index f5649672..5cfb7a00 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -27,6 +27,8 @@ def generate_activity_metadata(task, namespace) ) end + # @param task [Temporal::Api::WorkflowService::V1::PollWorkflowTaskQueueResponse] + # @param namespace [String] def generate_workflow_task_metadata(task, namespace) Metadata::WorkflowTask.new( namespace: namespace, @@ -40,7 +42,6 @@ def generate_workflow_task_metadata(task, namespace) end # @param event [Temporal::Workflow::History::Event] Workflow started history event - # @param event [WorkflowExecutionStartedEventAttributes] :attributes # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata def generate_workflow_metadata(event, task_metadata) Metadata::Workflow.new( diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index a115eb26..1f200e84 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -186,10 +186,14 @@ def now Time.now end - def on_signal(&block) + def on_signal(signal_name = nil, &block) raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end + def on_query(query, &block) + raise NotImplementedError, 'Queries are not available when Temporal::Testing.local! is on' + end + def cancel_activity(activity_id) raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on' end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 7414a845..74722f00 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -19,9 +19,10 @@ class Workflow class Context attr_reader :metadata, :config - def initialize(state_manager, dispatcher, workflow_class, metadata, config) + def initialize(state_manager, dispatcher, workflow_class, metadata, config, query_registry) @state_manager = state_manager @dispatcher = dispatcher + @query_registry = query_registry @workflow_class = workflow_class @metadata = metadata @completed = false @@ -298,7 +299,7 @@ def now # all signals that do not match a named signal handler. # # @param signal_name [String, Symbol, nil] an optional signal name; converted to a String - def on_signal(signal_name=nil, &block) + def on_signal(signal_name = nil, &block) if signal_name target = Signal.new(signal_name) dispatcher.register_handler(target, 'signaled') do |_, input| @@ -312,6 +313,10 @@ def on_signal(signal_name=nil, &block) end end + def on_query(query, &block) + query_registry.register(query, &block) + end + def cancel_activity(activity_id) command = Command::RequestActivityCancellation.new(activity_id: activity_id) @@ -344,8 +349,6 @@ def cancel(target, cancelation_id) # # @return [Future] future def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false) - options ||= {} - execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) command = Command::SignalExternalWorkflow.new( @@ -398,7 +401,7 @@ def upsert_search_attributes(search_attributes) private - attr_reader :state_manager, :dispatcher, :workflow_class + attr_reader :state_manager, :dispatcher, :workflow_class, :query_registry def completed! @completed = true diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 78feb61b..546fe7f6 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -1,6 +1,7 @@ require 'fiber' require 'temporal/workflow/dispatcher' +require 'temporal/workflow/query_registry' require 'temporal/workflow/state_manager' require 'temporal/workflow/context' require 'temporal/workflow/history/event_target' @@ -16,6 +17,7 @@ class Executor def initialize(workflow_class, history, task_metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new + @query_registry = QueryRegistry.new @state_manager = StateManager.new(dispatcher) @history = history @task_metadata = task_metadata @@ -36,13 +38,33 @@ def run return state_manager.commands end + # Process queries using the pre-registered query handlers + # + # @note this method is expected to be executed after the history has + # been fully replayed (by invoking the #run method) + # + # @param queries [Hash] + # + # @return [Hash] + def process_queries(queries = {}) + queries.transform_values(&method(:process_query)) + end + private - attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config + attr_reader :workflow_class, :dispatcher, :query_registry, :state_manager, :task_metadata, :history, :config + + def process_query(query) + result = query_registry.handle(query.query_type, query.query_args) + + QueryResult.answer(result) + rescue StandardError => error + QueryResult.failure(error) + end def execute_workflow(input, workflow_started_event) metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) - context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) + context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/temporal/workflow/query_registry.rb b/lib/temporal/workflow/query_registry.rb new file mode 100644 index 00000000..babdda66 --- /dev/null +++ b/lib/temporal/workflow/query_registry.rb @@ -0,0 +1,33 @@ +require 'temporal/errors' + +module Temporal + class Workflow + class QueryRegistry + def initialize + @handlers = {} + end + + def register(type, &handler) + if handlers.key?(type) + warn "[NOTICE] Overwriting a query handler for #{type}" + end + + handlers[type] = handler + end + + def handle(type, args = nil) + handler = handlers[type] + + unless handler + raise Temporal::QueryFailed, "Workflow did not register a handler for #{type}" + end + + handler.call(*args) + end + + private + + attr_reader :handlers + end + end +end diff --git a/lib/temporal/workflow/query_result.rb b/lib/temporal/workflow/query_result.rb new file mode 100644 index 00000000..a4d0401e --- /dev/null +++ b/lib/temporal/workflow/query_result.rb @@ -0,0 +1,16 @@ +module Temporal + class Workflow + module QueryResult + Answer = Struct.new(:result) + Failure = Struct.new(:error) + + def self.answer(result) + Answer.new(result).freeze + end + + def self.failure(error) + Failure.new(error).freeze + end + end + end +end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index ce271e4e..4b80918a 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -7,7 +7,20 @@ module Temporal class Workflow class TaskProcessor + Query = Struct.new(:query) do + include Concerns::Payloads + + def query_type + query.query_type + end + + def query_args + from_query_payloads(query.query_args) + end + end + MAX_FAILED_ATTEMPTS = 1 + LEGACY_QUERY_KEY = :legacy_query def initialize(task, namespace, workflow_lookup, middleware_chain, config) @task = task @@ -38,7 +51,13 @@ def process executor.run end - complete_task(commands) + query_results = executor.process_queries(parse_queries) + + if legacy_query_task? + complete_query(query_results[LEGACY_QUERY_KEY]) + else + complete_task(commands, query_results) + end rescue StandardError => error Temporal::ErrorHandler.handle(error, config, metadata: metadata) @@ -87,10 +106,44 @@ def fetch_full_history Workflow::History.new(events) end - def complete_task(commands) + def legacy_query_task? + !!task.query + end + + def parse_queries + # Support for deprecated query style + if legacy_query_task? + { LEGACY_QUERY_KEY => Query.new(task.query) } + else + task.queries.each_with_object({}) do |(query_id, query), result| + result[query_id] = Query.new(query) + end + end + end + + def complete_task(commands, query_results) Temporal.logger.info("Workflow task completed", metadata.to_h) - connection.respond_workflow_task_completed(namespace: namespace, task_token: task_token, commands: commands) + connection.respond_workflow_task_completed( + namespace: namespace, + task_token: task_token, + commands: commands, + query_results: query_results + ) + end + + def complete_query(result) + Temporal.logger.info("Workflow Query task completed", metadata.to_h) + + connection.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: result + ) + rescue StandardError => error + Temporal.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect)) + + Temporal::ErrorHandler.handle(error, config, metadata: metadata) end def fail_task(error) diff --git a/rbi/temporal-ruby.rbi b/rbi/temporal-ruby.rbi index b7da9d12..cdcda078 100644 --- a/rbi/temporal-ruby.rbi +++ b/rbi/temporal-ruby.rbi @@ -39,5 +39,5 @@ module Temporal class FeatureVersionNotSupportedFailure; end class NamespaceAlreadyExistsFailure; end class CancellationAlreadyRequestedFailure; end - class QueryFailedFailure; end + class QueryFailed; end end diff --git a/spec/fabricators/grpc/workflow_query_fabricator.rb b/spec/fabricators/grpc/workflow_query_fabricator.rb new file mode 100644 index 00000000..dcabbb24 --- /dev/null +++ b/spec/fabricators/grpc/workflow_query_fabricator.rb @@ -0,0 +1,4 @@ +Fabricator(:api_workflow_query, from: Temporal::Api::Query::V1::WorkflowQuery) do + query_type { 'state' } + query_args { Temporal.configuration.converter.to_payloads(['']) } +end diff --git a/spec/fabricators/grpc/workflow_task_fabricator.rb b/spec/fabricators/grpc/workflow_task_fabricator.rb index a699428a..d1470c04 100644 --- a/spec/fabricators/grpc/workflow_task_fabricator.rb +++ b/spec/fabricators/grpc/workflow_task_fabricator.rb @@ -10,6 +10,7 @@ scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } history { |attrs| Temporal::Api::History::V1::History.new(events: attrs[:events]) } + query { nil } end Fabricator(:api_paginated_workflow_task, from: :api_workflow_task) do diff --git a/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb new file mode 100644 index 00000000..8876bbd5 --- /dev/null +++ b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb @@ -0,0 +1,23 @@ +require 'temporal/connection/serializer/query_failure' +require 'temporal/workflow/query_result' +require 'temporal/concerns/payloads' + +describe Temporal::Connection::Serializer::QueryAnswer do + class TestDeserializer + extend Temporal::Concerns::Payloads + end + + describe 'to_proto' do + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } + + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(result.result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(result.answer).to eq(TestDeserializer.to_query_payloads(42)) + end + end +end diff --git a/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb new file mode 100644 index 00000000..7e948f4d --- /dev/null +++ b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb @@ -0,0 +1,19 @@ +require 'temporal/connection/serializer/query_failure' +require 'temporal/workflow/query_result' + +describe Temporal::Connection::Serializer::QueryFailure do + describe 'to_proto' do + let(:exception) { StandardError.new('Test query failure') } + let(:query_result) { Temporal::Workflow::QueryResult.failure(exception) } + + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(result.result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(result.error_message).to eq('Test query failure') + end + end +end diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb similarity index 75% rename from spec/unit/lib/temporal/grpc_client_spec.rb rename to spec/unit/lib/temporal/grpc_spec.rb index 70a7885d..8cbe5af7 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -1,3 +1,6 @@ +require 'temporal/connection/grpc' +require 'temporal/workflow/query_result' + describe Temporal::Connection::GRPC do subject { Temporal::Connection::GRPC.new(nil, nil, nil) } let(:grpc_stub) { double('grpc stub') } @@ -6,9 +9,13 @@ let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} + class TestDeserializer + extend Temporal::Concerns::Payloads + end + before do allow(subject).to receive(:client).and_return(grpc_stub) - + allow(Time).to receive(:now).and_return(now) end @@ -35,7 +42,7 @@ end end end - + describe '#signal_with_start_workflow' do let(:temporal_response) do Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx') @@ -148,7 +155,7 @@ end end - it 'demands a timeout to be specified' do + it 'demands a timeout to be specified' do expect do subject.get_workflow_execution_history( namespace: namespace, @@ -161,7 +168,7 @@ end end - it 'disallows a timeout larger than the server timeout' do + it 'disallows a timeout larger than the server timeout' do expect do subject.get_workflow_execution_history( namespace: namespace, @@ -345,4 +352,109 @@ end end end + + describe '#respond_query_task_completed' do + let(:task_token) { SecureRandom.uuid } + + before do + allow(grpc_stub) + .to receive(:respond_query_task_completed) + .and_return(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedResponse.new) + end + + context 'when query result is an answer' do + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } + + it 'makes an API request' do + subject.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: query_result + ) + + expect(grpc_stub).to have_received(:respond_query_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.completed_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(request.query_result).to eq(TestDeserializer.to_query_payloads(42)) + expect(request.error_message).to eq('') + end + end + end + + context 'when query result is a failure' do + let(:query_result) { Temporal::Workflow::QueryResult.failure(StandardError.new('Test query failure')) } + + it 'makes an API request' do + subject.respond_query_task_completed( + namespace: namespace, + task_token: task_token, + query_result: query_result + ) + + expect(grpc_stub).to have_received(:respond_query_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.completed_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(request.query_result).to eq(nil) + expect(request.error_message).to eq('Test query failure') + end + end + end + end + + describe '#respond_workflow_task_completed' do + let(:task_token) { SecureRandom.uuid } + + before do + allow(grpc_stub) + .to receive(:respond_workflow_task_completed) + .and_return(Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedResponse.new) + end + + context 'when responding with query results' do + let(:query_results) do + { + '1' => Temporal::Workflow::QueryResult.answer(42), + '2' => Temporal::Workflow::QueryResult.failure(StandardError.new('Test query failure')), + } + end + + it 'makes an API request' do + subject.respond_workflow_task_completed( + namespace: namespace, + task_token: task_token, + commands: [], + query_results: query_results + ) + + expect(grpc_stub).to have_received(:respond_workflow_task_completed) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest) + expect(request.task_token).to eq(task_token) + expect(request.namespace).to eq(namespace) + expect(request.commands).to be_empty + + expect(request.query_results.length).to eq(2) + + expect(request.query_results['1']).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(request.query_results['1'].result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) + ) + expect(request.query_results['1'].answer).to eq(TestDeserializer.to_query_payloads(42)) + + expect(request.query_results['2']).to be_a(Temporal::Api::Query::V1::WorkflowQueryResult) + expect(request.query_results['2'].result_type).to eq(Temporal::Api::Enums::V1::QueryResultType.lookup( + Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) + ) + expect(request.query_results['2'].error_message).to eq('Test query failure') + end + end + end + end end diff --git a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb index 7259c1f9..99f95a84 100644 --- a/spec/unit/lib/temporal/metadata/workflow_task_spec.rb +++ b/spec/unit/lib/temporal/metadata/workflow_task_spec.rb @@ -32,7 +32,7 @@ 'namespace' => subject.namespace, 'workflow_id' => subject.workflow_id, 'workflow_name' => subject.workflow_name, - 'workflow_run_id' => subject.workflow_run_id, + 'workflow_run_id' => subject.workflow_run_id }) end end diff --git a/spec/unit/lib/temporal/workflow/context_spec.rb b/spec/unit/lib/temporal/workflow/context_spec.rb index 24768d82..a6dd2921 100644 --- a/spec/unit/lib/temporal/workflow/context_spec.rb +++ b/spec/unit/lib/temporal/workflow/context_spec.rb @@ -7,10 +7,32 @@ class MyTestWorkflow < Temporal::Workflow; end describe Temporal::Workflow::Context do let(:state_manager) { instance_double('Temporal::Workflow::StateManager') } let(:dispatcher) { instance_double('Temporal::Workflow::Dispatcher') } + let(:query_registry) { instance_double('Temporal::Workflow::QueryRegistry') } let(:metadata) { instance_double('Temporal::Metadata::Workflow') } - let(:workflow_context) { - Temporal::Workflow::Context.new(state_manager, dispatcher, MyTestWorkflow, metadata, Temporal.configuration) - } + let(:workflow_context) do + Temporal::Workflow::Context.new( + state_manager, + dispatcher, + MyTestWorkflow, + metadata, + Temporal.configuration, + query_registry + ) + end + + describe '#on_query' do + let(:handler) { Proc.new {} } + + before { allow(query_registry).to receive(:register) } + + it 'registers a query with the query registry' do + workflow_context.on_query('test-query', &handler) + + expect(query_registry).to have_received(:register).with('test-query') do |&block| + expect(block).to eq(handler) + end + end + end describe '#upsert_search_attributes' do it 'does not accept nil' do diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index bc9828da..b93105ac 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -1,6 +1,8 @@ require 'temporal/workflow/executor' require 'temporal/workflow/history' require 'temporal/workflow' +require 'temporal/workflow/task_processor' +require 'temporal/workflow/query_registry' describe Temporal::Workflow::Executor do subject { described_class.new(workflow, history, workflow_metadata, config) } @@ -80,4 +82,36 @@ def execute ) end end -end \ No newline at end of file + + describe '#process_queries' do + let(:query_registry) { Temporal::Workflow::QueryRegistry.new } + let(:query_1_result) { 42 } + let(:query_2_error) { StandardError.new('Test query failure') } + let(:queries) do + { + '1' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), + '2' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), + '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')), + } + end + + before do + allow(Temporal::Workflow::QueryRegistry).to receive(:new).and_return(query_registry) + query_registry.register('success') { query_1_result } + query_registry.register('failure') { raise query_2_error } + end + + it 'returns query results' do + results = subject.process_queries(queries) + + expect(results.length).to eq(3) + expect(results['1']).to be_a(Temporal::Workflow::QueryResult::Answer) + expect(results['1'].result).to eq(query_1_result) + expect(results['2']).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(results['2'].error).to eq(query_2_error) + expect(results['3']).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(results['3'].error).to be_a(Temporal::QueryFailed) + expect(results['3'].error.message).to eq('Workflow did not register a handler for unknown') + end + end +end diff --git a/spec/unit/lib/temporal/workflow/query_registry_spec.rb b/spec/unit/lib/temporal/workflow/query_registry_spec.rb new file mode 100644 index 00000000..3c5ced14 --- /dev/null +++ b/spec/unit/lib/temporal/workflow/query_registry_spec.rb @@ -0,0 +1,67 @@ +require 'temporal/workflow/query_registry' + +describe Temporal::Workflow::QueryRegistry do + subject { described_class.new } + + describe '#register' do + let(:handler) { Proc.new {} } + + it 'registers a query handler' do + subject.register('test-query', &handler) + + expect(subject.send(:handlers)['test-query']).to eq(handler) + end + + context 'when query handler is already registered' do + let(:handler_2) { Proc.new {} } + + before { subject.register('test-query', &handler) } + + it 'warns' do + allow(subject).to receive(:warn) + + subject.register('test-query', &handler_2) + + expect(subject) + .to have_received(:warn) + .with('[NOTICE] Overwriting a query handler for test-query') + end + + it 're-registers a query handler' do + subject.register('test-query', &handler_2) + + expect(subject.send(:handlers)['test-query']).to eq(handler_2) + end + end + end + + describe '#handle' do + context 'when a query handler has been registered' do + let(:handler) { Proc.new { 42 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query')).to eq(42) + end + end + + context 'when a query handler has been registered with args' do + let(:handler) { Proc.new { |arg_1, arg_2| arg_1 + arg_2 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query', [3, 5])).to eq(8) + end + end + + context 'when a query handler has not been registered' do + it 'raises' do + expect do + subject.handle('test-query') + end.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for test-query') + end + end + end +end diff --git a/spec/unit/lib/temporal/workflow/query_result_spec.rb b/spec/unit/lib/temporal/workflow/query_result_spec.rb new file mode 100644 index 00000000..222446a8 --- /dev/null +++ b/spec/unit/lib/temporal/workflow/query_result_spec.rb @@ -0,0 +1,25 @@ +require 'temporal/workflow/query_result' + +describe Temporal::Workflow::QueryResult do + describe '.answer' do + it 'returns an anwer query result' do + result = described_class.answer(42) + + expect(result).to be_a(Temporal::Workflow::QueryResult::Answer) + expect(result).to be_frozen + expect(result.result).to eq(42) + end + end + + describe '.failure' do + let(:error) { StandardError.new('Test query failure') } + + it 'returns a failure query result' do + result = described_class.failure(error) + + expect(result).to be_a(Temporal::Workflow::QueryResult::Failure) + expect(result).to be_frozen + expect(result.error).to eq(error) + end + end +end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 188b3a90..5a02569e 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -7,7 +7,9 @@ let(:namespace) { 'test-namespace' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } - let(:task) { Fabricate(:api_workflow_task, workflow_type: api_workflow_type) } + let(:query) { nil } + let(:queries) { nil } + let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } let(:api_workflow_type) { Fabricate(:api_workflow_type, name: workflow_name) } let(:workflow_name) { 'TestWorkflow' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -24,6 +26,7 @@ .with(config.for_connection) .and_return(connection) allow(connection).to receive(:respond_workflow_task_completed) + allow(connection).to receive(:respond_query_task_completed) allow(connection).to receive(:respond_workflow_task_failed) allow(middleware_chain).to receive(:invoke).and_call_original @@ -65,6 +68,7 @@ allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) allow(executor).to receive(:run) { workflow_class.execute_in_context(context, input); commands } + allow(executor).to receive(:process_queries) end context 'when workflow task completes' do @@ -84,20 +88,78 @@ ) end - it 'completes the workflow task' do - subject.process + context 'when workflow task queries are included' do + let(:query_id) { SecureRandom.uuid } + let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } + let(:queries) do + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Query::V1::WorkflowQuery).tap do |map| + map[query_id] = Fabricate(:api_workflow_query) + end + end - expect(connection) - .to have_received(:respond_workflow_task_completed) - .with(namespace: namespace, task_token: task.task_token, commands: commands) + before do + allow(executor).to receive(:process_queries).and_return(query_id => query_result) + end + + it 'completes the workflow task with query results' do + subject.process + + expect(executor) + .to have_received(:process_queries) + .with(query_id => an_instance_of(Temporal::Workflow::TaskProcessor::Query)) + expect(connection) + .to have_received(:respond_workflow_task_completed) + .with( + namespace: namespace, + task_token: task.task_token, + commands: commands, + query_results: { query_id => query_result } + ) + end end - it 'ignores connection exception' do - allow(connection) - .to receive(:respond_workflow_task_completed) - .and_raise(StandardError) + context 'when deprecated task query is present' do + let(:query) { Fabricate(:api_workflow_query) } + let(:result) { Temporal::Workflow::QueryResult.answer(42) } - subject.process + before do + allow(executor).to receive(:process_queries).and_return(legacy_query: result) + end + + it 'completes the workflow query task with the result' do + subject.process + + expect(executor).to have_received(:process_queries).with( + legacy_query: an_instance_of(Temporal::Workflow::TaskProcessor::Query) + ) + expect(connection).to_not have_received(:respond_workflow_task_completed) + expect(connection) + .to have_received(:respond_query_task_completed) + .with( + task_token: task.task_token, + namespace: namespace, + query_result: result + ) + end + end + + context 'when deprecated task query is not present' do + it 'completes the workflow task' do + subject.process + + expect(connection).to_not have_received(:respond_query_task_completed) + expect(connection) + .to have_received(:respond_workflow_task_completed) + .with(namespace: namespace, task_token: task.task_token, commands: commands, query_results: nil) + end + + it 'ignores connection exception' do + allow(connection) + .to receive(:respond_workflow_task_completed) + .and_raise(StandardError) + + subject.process + end end it 'sends queue_time metric' do @@ -122,48 +184,67 @@ before { allow(workflow_class).to receive(:execute_in_context).and_raise(exception) } - it 'fails the workflow task' do - subject.process + context 'when deprecated task query is present' do + let(:query) { Fabricate(:api_workflow_query) } - expect(connection) - .to have_received(:respond_workflow_task_failed) - .with( - namespace: namespace, - task_token: task.task_token, - cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, - exception: exception - ) + it 'fails the workflow task' do + subject.process + + expect(connection) + .to have_received(:respond_workflow_task_failed) + .with( + namespace: namespace, + task_token: task.task_token, + cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + exception: exception + ) + end end - it 'does not fail the task beyond the first attempt' do - task.attempt = 2 - subject.process + context 'when deprecated task query is not present' do + it 'fails the workflow task' do + subject.process - expect(connection) - .not_to have_received(:respond_workflow_task_failed) - end + expect(connection) + .to have_received(:respond_workflow_task_failed) + .with( + namespace: namespace, + task_token: task.task_token, + cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + exception: exception + ) + end - it 'ignores connection exception' do - allow(connection) - .to receive(:respond_workflow_task_failed) - .and_raise(StandardError) + it 'does not fail the task beyond the first attempt' do + task.attempt = 2 + subject.process - subject.process - end + expect(connection) + .not_to have_received(:respond_workflow_task_failed) + end - it 'calls error_handlers' do - reported_error = nil - reported_metadata = nil + it 'ignores connection exception' do + allow(connection) + .to receive(:respond_workflow_task_failed) + .and_raise(StandardError) - config.on_error do |error, metadata: nil| - reported_error = error - reported_metadata = metadata + subject.process end - subject.process + it 'calls error_handlers' do + reported_error = nil + reported_metadata = nil + + config.on_error do |error, metadata: nil| + reported_error = error + reported_metadata = metadata + end + + subject.process - expect(reported_error).to be_an_instance_of(StandardError) - expect(reported_metadata).to be_an_instance_of(Temporal::Metadata::WorkflowTask) + expect(reported_error).to be_an_instance_of(StandardError) + expect(reported_metadata).to be_an_instance_of(Temporal::Metadata::WorkflowTask) + end end it 'sends queue_time metric' do @@ -183,6 +264,30 @@ end end + context 'when legacy query fails' do + let(:query) { Fabricate(:api_workflow_query) } + let(:exception) { StandardError.new('workflow task failed') } + let(:query_failure) { Temporal::Workflow::QueryResult.failure(exception) } + + before do + allow(executor) + .to receive(:process_queries) + .and_return(legacy_query: query_failure) + end + + it 'fails the workflow task' do + subject.process + + expect(connection) + .to have_received(:respond_query_task_completed) + .with( + namespace: namespace, + task_token: task.task_token, + query_result: query_failure + ) + end + end + context 'when history is paginated' do let(:task) { Fabricate(:api_paginated_workflow_task, workflow_type: api_workflow_type) } let(:event) { Fabricate(:api_workflow_execution_started_event) }