Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions contrib/examples/actions/mistral-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ workflows:
- cmd
tasks:
run-cmd:
action: st2.action
action: core.local
input:
ref: core.local
parameters:
cmd: $.cmd
cmd: $.cmd
publish:
stdout: $.stdout
stderr: $.stderr
24 changes: 8 additions & 16 deletions contrib/examples/actions/mistral-complex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ workflows:
on-success:
- power_on_vm
power_on_vm:
action: st2.action
action: core.local
input:
ref: core.local
parameters:
cmd: "sleep 2; printf \"running\""
cmd: "sleep 2; printf 'running'"
publish:
vm_state: $.stdout

Expand All @@ -49,11 +47,9 @@ workflows:
- fail
tasks:
create:
action: st2.action
action: core.local
input:
ref: core.local
parameters:
cmd: "sleep 3; printf \"vm1234\""
cmd: "sleep 3; printf 'vm1234'"
publish:
vm_id: $.stdout

Expand All @@ -68,14 +64,10 @@ workflows:
- fail
tasks:
add_disk:
action: st2.action
action: core.local
input:
ref: core.local
parameters:
cmd: "sleep 1; printf \"{$.vm_id}\""
cmd: "sleep 1; printf '{$.vm_id}'"
edit_cpu_mem:
action: st2.action
action: core.local
input:
ref: core.local
parameters:
cmd: "sleep 1; printf \"{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}\""
cmd: "sleep 1; printf '{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}'"
51 changes: 8 additions & 43 deletions docs/source/mistral.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ Mistral
=======
`Mistral <https://wiki.openstack.org/wiki/Mistral>`_ is an OpenStack project that manages and executes workflows as a service. Mistral is installed as a separate service named "mistral" along with |st2|. A Mistral workflow can be defined as an |st2| action in a Mistral workbook using the `DSL v2 <https://wiki.openstack.org/wiki/Mistral/DSLv2>`_. On action execution, |st2| writes the workbook to Mistral and executes the workflow in the workbook. The workflow can invoke other |st2| actions as subtasks. There're custom actions in Mistral responsible for handling calls and context for |st2|. Subtasks in the workflow that are |st2| actions are tracked under the parent workflow in |st2|. |st2| actively polls Mistral for execution results.

Custom Mistral Actions
++++++++++++++++++++++
|st2| introduces a custom action in Mistral named **st2.action**. This custom action us used for a unit of work or subtask in a workflow. **st2.action** should be used to schedule a st2 action.

Basic Workflow
++++++++++++++
Let's start with a very basic workflow that calls a |st2| action and notifies |st2| when the workflow is done. The files used in this example is also located under /usr/share/doc/st2/examples if |st2| is already installed. The first task is named **run-cmd** that executes a shell command on the local server where st2 is installed. A st2.action takes two input arguments: ref (or name) of the |st2| action and a list of input parameters for the |st2| action. In this case, the run-cmd task is calling **core.local** and passing the cmd as input. Let's save this as mistral-basic.yaml at /opt/stackstorm/packs/examples/actions/ where |st2| is installed.
Let's start with a very basic workflow that calls a |st2| action and notifies |st2| when the workflow is done. The files used in this example is also located under /usr/share/doc/st2/examples if |st2| is already installed. The first task is named **run-cmd** that executes a shell command on the local server where st2 is installed. The run-cmd task is calling **core.local** and passing the cmd as input. **core.local** is an action that comes installed with |st2|. In the workflow, we can reference |st2| action directly. When the workflow is invoked, |st2| will translate the workflow definition appropriately before sending it to Mistral. Let's save this as mistral-basic.yaml at /opt/stackstorm/packs/examples/actions/ where |st2| is installed.

.. literalinclude:: /../../contrib/examples/actions/mistral-basic.yaml

Expand Down Expand Up @@ -59,64 +55,33 @@ Let's say we need to upgrade and reboot all the members of a MongoDB replica set
- duration
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to make this worklfow actually working against 'mock' actions (make them part of example), run this workflow as part of the test, and literanlinclude it in the docs.

tasks:
replica-set-check-status:
action: st2.action
input:
ref: mongodb.rs-check-status
parameters:
primary: $.primary
action: mongodb.rs-check-status primary={$.primary}
on-success:
- elect-primary
elect-primary:
action: st2.action
input:
ref: mongodb.elect-primary
parameters:
members: $.members
action: mongodb.elect-primary members={$.members}
publish:
candidate: $.candidate
secondary: $.secondary
on-success:
- update-candidate
update-candidate:
action: st2.action
input:
ref: mongodb.run-update-xyz
parameters:
node: $.candidate
action: mongodb.run-update-xyz node={$.candidate}
on-success:
- freeze-secondary
freeze-secondary:
action: st2.action
input:
ref: mongodb.freeze
parameters:
node: $.secondary
duration: $.duration
action: mongodb.freeze node={$.secondary} duration={$.duration}
on-success:
- step-down-primary
step-down-primary:
action: st2.action
input:
ref: mongodb.step-down
parameters:
primary: $.primary
duration: $.duration
action: mongodb.step-down primary={$.primary} duration={$.duration}
policies:
wait-after: $.duration
on-success:
- update-primary
update-primary:
action: st2.action
input:
ref: mongodb.run-update-xyz
parameters:
node: $.primary
action: mongodb.run-update-xyz node={$.primary}
on-success:
- update-secondary
update-secondary:
action: st2.action
input:
ref: mongodb.run-update-xyz
parameters:
node: $.secondary

action: mongodb.run-update-xyz node={$.secondary}
156 changes: 156 additions & 0 deletions st2actions/st2actions/runners/mistral/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re

import six
import yaml

from st2common import log as logging
from st2common.models.system.common import ResourceReference
from st2common.persistence.action import Action


LOG = logging.getLogger(__name__)

REGEX_ACTION = re.compile("^[\w\.]+[^=\s\"]*")
REGEX_ACTION_PARAMS = re.compile("([\w]+)=(\"[^\"]*\"\s*|'[^']*'\s*|"
"\{[^}]*\}\s*|\[.*\]\s*|[\.,:\w\d\.]+)")


def _parse_cmd_and_input(cmd_str):
cmd_matcher = REGEX_ACTION.search(cmd_str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a complete regex n00b so I have no clue what this is doing. Can you add some docs or comments in the method to describe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from Mistral @ https://github.com/stackforge/mistral/blob/master/mistral/workbook/base.py#L23. It's used to evaluate inline action parameters (i.e. action: core.local cmd={$.cmd}). The regex could have been written better. But I would fix it at the Mistral side. I'm just trying to keep the evaluation consistent here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K.


if not cmd_matcher:
raise ValueError('Invalid action/workflow task property: %s' % cmd_str)

cmd = cmd_matcher.group()

params = {}
for k, v in re.findall(REGEX_ACTION_PARAMS, cmd_str):
v = v.strip()

try:
v = json.loads(v)
except Exception:
pass

params[k] = v

return cmd, params


def _merge_dicts(left, right):
if left is None:
return right

if right is None:
return left

for k, v in right.iteritems():
if k not in left:
left[k] = v
else:
left_v = left[k]

if isinstance(left_v, dict) and isinstance(v, dict):
_merge_dicts(left_v, v)

return left


def _eval_inline_params(spec, action_key, input_key):
action_str = spec.get(action_key)
command, inputs = _parse_cmd_and_input(action_str)
if inputs:
spec[action_key] = command
if input_key not in spec:
spec[input_key] = {}
_merge_dicts(spec[input_key], inputs)


def _transform_action(spec, action_key, input_key):

if action_key not in spec or spec.get(action_key) == 'st2.action':
return

if spec.get(action_key) == 'st2.callback':
raise Exception('st2.callback is deprecated.')

# Convert parameters that are inline (i.e. action: some_action var1={$.value1} var2={$.value2})
# and split it to action name and input dict as illustrated below.
#
# action: some_action
# input:
# var1: $.value1
# var2: $.value2
#
# This step to separate the action name and the input parameters is required
# to wrap them with the st2.action proxy.
#
# action: st2.action
# input:
# ref: some_action
# parameters:
# var1: $.value1
# var2: $.value2
_eval_inline_params(spec, action_key, input_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment describing the need for this evaluation.


action_ref = spec.get(action_key)

if ResourceReference.is_resource_reference(action_ref):
ref = ResourceReference.from_string_reference(ref=action_ref)
actions = Action.query(name=ref.name, pack=ref.pack)
action = actions.first() if actions else None
else:
action = None

if action:
spec[action_key] = 'st2.action'
spec[input_key] = {
'ref': action_ref,
'parameters': spec[input_key]
}


def transform_definition(definition):
spec = yaml.safe_load(definition)

if 'version' not in spec:
raise Exception('Unknown version. Only version 2.0 is supported.')

if spec['version'] != '2.0':
raise Exception('Only version 2.0 is supported.')

# Transform adhoc actions
for action_name, action_spec in six.iteritems(spec.get('actions', {})):
_transform_action(action_spec, 'base', 'base-input')

# Determine if definition is a workbook or workflow
is_workbook = 'workflows' in spec

# Transform tasks
if is_workbook:
for workflow_name, workflow_spec in six.iteritems(spec.get('workflows', {})):
for task_name, task_spec in six.iteritems(workflow_spec.get('tasks')):
_transform_action(task_spec, 'action', 'input')
else:
for key, value in six.iteritems(spec):
if 'tasks' in value:
for task_name, task_spec in six.iteritems(value.get('tasks')):
_transform_action(task_spec, 'action', 'input')

return yaml.safe_dump(spec, default_flow_style=False)
8 changes: 5 additions & 3 deletions st2actions/st2actions/runners/mistral/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from st2common.constants.action import ACTIONEXEC_STATUS_RUNNING
from st2actions.runners import AsyncActionRunner
from st2actions.runners.mistral import utils
from st2common import log as logging


Expand Down Expand Up @@ -48,12 +49,13 @@ def run(self, action_parameters):
workbook_name = self.action.pack + '.' + self.action.name
with open(self.entry_point, 'r') as wbkfile:
definition = wbkfile.read()
transformed_definition = utils.transform_definition(definition)
try:
wbk = client.workbooks.get(workbook_name)
if wbk.definition != definition:
client.workbooks.update(definition)
if wbk.definition != transformed_definition:
client.workbooks.update(transformed_definition)
except:
client.workbooks.create(definition)
client.workbooks.create(transformed_definition)

# Setup context for the workflow execution.
context = self.runner_parameters.get('context', dict())
Expand Down
Loading