Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ Besides calling activities workflows can:
- Use timers
- Receive signals
- Execute other (child) workflows
- Respond to queries [not yet implemented]
- Respond to queries


## Activities
Expand Down
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Temporal.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ worker.register_workflow(MetadataWorkflow)
worker.register_workflow(ParentCloseWorkflow)
worker.register_workflow(ParentWorkflow)
worker.register_workflow(ProcessFileWorkflow)
worker.register_workflow(QueryWorkflow)
worker.register_workflow(QuickTimeoutWorkflow)
worker.register_workflow(RandomlyFailingWorkflow)
worker.register_workflow(ReleaseWorkflow)
Expand Down
54 changes: 54 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require 'workflows/query_workflow'
require 'temporal/errors'

describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')

Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Temporal::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Temporal::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
5 changes: 3 additions & 2 deletions lib/temporal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Temporal
:describe_namespace,
:list_namespaces,
:signal_workflow,
:query_workflow,
:await_workflow_result,
:reset_workflow,
:terminate_workflow,
Expand Down Expand Up @@ -48,11 +49,11 @@ def metrics
end

private

def default_client
@default_client ||= Client.new(config)
end

def config
@config ||= Configuration.new
end
Expand Down
25 changes: 24 additions & 1 deletion lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,29 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac
)
end

# Issue a query against a running workflow
#
# @param workflow [Temporal::Workflow, nil] workflow class or nil
# @param query [String] name of the query to issue
# @param workflow_id [String]
# @param run_id [String]
# @param args [String, Array, nil] optional arguments for the query
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
# global default
# @param query_reject_condition [Symbol] check Temporal::Connection::GRPC::QUERY_REJECT_CONDITION
def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil, query_reject_condition: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)

connection.query_workflow(
namespace: namespace || execution_options.namespace,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

# Long polls for a workflow to be completed and returns workflow's return value.
#
# @note This function times out after 30 seconds and throws Temporal::TimeoutError,
Expand Down Expand Up @@ -207,7 +230,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
timeout: timeout || max_timeout,
)
rescue GRPC::DeadlineExceeded => e
message = if timeout
message = if timeout
"Timed out after your specified limit of timeout: #{timeout} seconds"
else
"Timed out after #{max_timeout} seconds, which is the maximum supported amount."
Expand Down
8 changes: 8 additions & 0 deletions lib/temporal/concerns/payloads.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
from_payloads(payloads)&.first
end

def from_query_payloads(payloads)
from_payloads(payloads)&.first
end

def from_payload_map(payload_map)
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
end
Expand All @@ -45,6 +49,10 @@ def to_signal_payloads(data)
to_payloads([data])
end

def to_query_payloads(data)
to_payloads([data])
end

def to_payload_map(data)
data.transform_values(&method(:to_payload))
end
Expand Down
68 changes: 58 additions & 10 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class GRPC
close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
}.freeze

QUERY_REJECT_CONDITION = {
none: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NONE,
not_open: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_OPEN,
not_completed_cleanly: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
max_page_size: 100
}.freeze
Expand Down Expand Up @@ -142,7 +148,7 @@ def get_workflow_execution_history(
event_type: :all,
timeout: nil
)
if wait_for_new_event
if wait_for_new_event
if timeout.nil?
# This is an internal error. Wrappers should enforce this.
raise "You must specify a timeout when wait_for_new_event = true."
Expand Down Expand Up @@ -183,13 +189,28 @@ def poll_workflow_task_queue(namespace:, task_queue:)
poll_request.execute
end

def respond_workflow_task_completed(namespace:, task_token:, commands:)
def respond_query_task_completed(namespace:, task_token:, query_result:)
query_result_proto = Serializer.serialize(query_result)
request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new(
task_token: task_token,
namespace: namespace,
completed_type: query_result_proto.result_type,
query_result: query_result_proto.answer,
error_message: query_result_proto.error_message,
)

client.respond_query_task_completed(request)
end

def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results: {})
request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new(
namespace: namespace,
identity: identity,
task_token: task_token,
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) },
query_results: query_results.transform_values { |value| Serializer.serialize(value) }
)

client.respond_workflow_task_completed(request)
end

Expand Down Expand Up @@ -452,16 +473,43 @@ def get_search_attributes
raise NotImplementedError
end

def respond_query_task_completed
raise NotImplementedError
end

def reset_sticky_task_queue
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new(
namespace: namespace,
execution: Temporal::Api::Common::V1::WorkflowExecution.new(
workflow_id: workflow_id,
run_id: run_id
),
query: Temporal::Api::Query::V1::WorkflowQuery.new(
query_type: query,
query_args: to_query_payloads(args)
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
rescue ::GRPC::InvalidArgument => e
raise Temporal::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Temporal::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Temporal::QueryFailed, 'Invalid response from server'
else
from_query_payloads(response.query_result)
end
end

def describe_workflow_execution(namespace:, workflow_id:, run_id:)
Expand Down Expand Up @@ -534,7 +582,7 @@ def serialize_status_filter(value)

sym = Temporal::Workflow::Status::API_STATUS_MAP.invert[value]
status = Temporal::Api::Enums::V1::WorkflowExecutionStatus.resolve(sym)

Temporal::Api::Filter::V1::StatusFilter.new(status: status)
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/connection/serializer.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/workflow/command'
require 'temporal/workflow/query_result'
require 'temporal/connection/serializer/schedule_activity'
require 'temporal/connection/serializer/start_child_workflow'
require 'temporal/connection/serializer/request_activity_cancellation'
Expand All @@ -10,6 +11,8 @@
require 'temporal/connection/serializer/fail_workflow'
require 'temporal/connection/serializer/signal_external_workflow'
require 'temporal/connection/serializer/upsert_search_attributes'
require 'temporal/connection/serializer/query_answer'
require 'temporal/connection/serializer/query_failure'

module Temporal
module Connection
Expand All @@ -26,6 +29,8 @@ module Serializer
Workflow::Command::FailWorkflow => Serializer::FailWorkflow,
Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow,
Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes,
Workflow::QueryResult::Answer => Serializer::QueryAnswer,
Workflow::QueryResult::Failure => Serializer::QueryFailure,
}.freeze

def self.serialize(object)
Expand Down
19 changes: 19 additions & 0 deletions lib/temporal/connection/serializer/query_answer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
require 'temporal/connection/serializer/base'
require 'temporal/concerns/payloads'

module Temporal
module Connection
module Serializer
class QueryAnswer < Base
include Concerns::Payloads

def to_proto
Temporal::Api::Query::V1::WorkflowQueryResult.new(
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED,
answer: to_query_payloads(object.result)
)
end
end
end
end
end
16 changes: 16 additions & 0 deletions lib/temporal/connection/serializer/query_failure.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require 'temporal/connection/serializer/base'

module Temporal
module Connection
module Serializer
class QueryFailure < Base
def to_proto
Temporal::Api::Query::V1::WorkflowQueryResult.new(
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED,
error_message: object.error.message
)
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/temporal/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ClientVersionNotSupportedFailure < ApiError; end
class FeatureVersionNotSupportedFailure < ApiError; end
class NamespaceAlreadyExistsFailure < ApiError; end
class CancellationAlreadyRequestedFailure < ApiError; end
class QueryFailedFailure < ApiError; end
class QueryFailed < ApiError; end
class UnexpectedResponse < ApiError; end

end
Loading