diff --git a/contrib/examples/actions/mistral-basic.yaml b/contrib/examples/actions/mistral-basic.yaml index f62c18f404..1292feb2f6 100644 --- a/contrib/examples/actions/mistral-basic.yaml +++ b/contrib/examples/actions/mistral-basic.yaml @@ -8,11 +8,9 @@ workflows: - cmd tasks: run-cmd: - action: st2.action + action: core.local input: - ref: core.local - parameters: - cmd: $.cmd + cmd: $.cmd publish: stdout: $.stdout stderr: $.stderr diff --git a/contrib/examples/actions/mistral-complex.yaml b/contrib/examples/actions/mistral-complex.yaml index 0762cd8429..3f55bd32fb 100644 --- a/contrib/examples/actions/mistral-complex.yaml +++ b/contrib/examples/actions/mistral-complex.yaml @@ -30,11 +30,9 @@ workflows: on-success: - power_on_vm power_on_vm: - action: st2.action + action: core.local input: - ref: core.local - parameters: - cmd: "sleep 2; printf \"running\"" + cmd: "sleep 2; printf 'running'" publish: vm_state: $.stdout @@ -49,11 +47,9 @@ workflows: - fail tasks: create: - action: st2.action + action: core.local input: - ref: core.local - parameters: - cmd: "sleep 3; printf \"vm1234\"" + cmd: "sleep 3; printf 'vm1234'" publish: vm_id: $.stdout @@ -68,14 +64,10 @@ workflows: - fail tasks: add_disk: - action: st2.action + action: core.local input: - ref: core.local - parameters: - cmd: "sleep 1; printf \"{$.vm_id}\"" + cmd: "sleep 1; printf '{$.vm_id}'" edit_cpu_mem: - action: st2.action + action: core.local input: - ref: core.local - parameters: - cmd: "sleep 1; printf \"{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}\"" + cmd: "sleep 1; printf '{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}'" diff --git a/docs/source/mistral.rst b/docs/source/mistral.rst index 21593e34b9..eb6a6bb784 100644 --- a/docs/source/mistral.rst +++ b/docs/source/mistral.rst @@ -2,13 +2,9 @@ Mistral ======= `Mistral `_ is an OpenStack project that manages and executes workflows as a service. Mistral is installed as a separate service named "mistral" along with |st2|. A Mistral workflow can be defined as an |st2| action in a Mistral workbook using the `DSL v2 `_. On action execution, |st2| writes the workbook to Mistral and executes the workflow in the workbook. The workflow can invoke other |st2| actions as subtasks. There're custom actions in Mistral responsible for handling calls and context for |st2|. Subtasks in the workflow that are |st2| actions are tracked under the parent workflow in |st2|. |st2| actively polls Mistral for execution results. -Custom Mistral Actions -++++++++++++++++++++++ -|st2| introduces a custom action in Mistral named **st2.action**. This custom action us used for a unit of work or subtask in a workflow. **st2.action** should be used to schedule a st2 action. - Basic Workflow ++++++++++++++ -Let's start with a very basic workflow that calls a |st2| action and notifies |st2| when the workflow is done. The files used in this example is also located under /usr/share/doc/st2/examples if |st2| is already installed. The first task is named **run-cmd** that executes a shell command on the local server where st2 is installed. A st2.action takes two input arguments: ref (or name) of the |st2| action and a list of input parameters for the |st2| action. In this case, the run-cmd task is calling **core.local** and passing the cmd as input. Let's save this as mistral-basic.yaml at /opt/stackstorm/packs/examples/actions/ where |st2| is installed. +Let's start with a very basic workflow that calls a |st2| action and notifies |st2| when the workflow is done. The files used in this example is also located under /usr/share/doc/st2/examples if |st2| is already installed. The first task is named **run-cmd** that executes a shell command on the local server where st2 is installed. The run-cmd task is calling **core.local** and passing the cmd as input. **core.local** is an action that comes installed with |st2|. In the workflow, we can reference |st2| action directly. When the workflow is invoked, |st2| will translate the workflow definition appropriately before sending it to Mistral. Let's save this as mistral-basic.yaml at /opt/stackstorm/packs/examples/actions/ where |st2| is installed. .. literalinclude:: /../../contrib/examples/actions/mistral-basic.yaml @@ -59,64 +55,33 @@ Let's say we need to upgrade and reboot all the members of a MongoDB replica set - duration tasks: replica-set-check-status: - action: st2.action - input: - ref: mongodb.rs-check-status - parameters: - primary: $.primary + action: mongodb.rs-check-status primary={$.primary} on-success: - elect-primary elect-primary: - action: st2.action - input: - ref: mongodb.elect-primary - parameters: - members: $.members + action: mongodb.elect-primary members={$.members} publish: candidate: $.candidate secondary: $.secondary on-success: - update-candidate update-candidate: - action: st2.action - input: - ref: mongodb.run-update-xyz - parameters: - node: $.candidate + action: mongodb.run-update-xyz node={$.candidate} on-success: - freeze-secondary freeze-secondary: - action: st2.action - input: - ref: mongodb.freeze - parameters: - node: $.secondary - duration: $.duration + action: mongodb.freeze node={$.secondary} duration={$.duration} on-success: - step-down-primary step-down-primary: - action: st2.action - input: - ref: mongodb.step-down - parameters: - primary: $.primary - duration: $.duration + action: mongodb.step-down primary={$.primary} duration={$.duration} policies: wait-after: $.duration on-success: - update-primary update-primary: - action: st2.action - input: - ref: mongodb.run-update-xyz - parameters: - node: $.primary + action: mongodb.run-update-xyz node={$.primary} on-success: - update-secondary update-secondary: - action: st2.action - input: - ref: mongodb.run-update-xyz - parameters: - node: $.secondary - + action: mongodb.run-update-xyz node={$.secondary} diff --git a/st2actions/st2actions/runners/mistral/utils.py b/st2actions/st2actions/runners/mistral/utils.py new file mode 100644 index 0000000000..89f7a41a53 --- /dev/null +++ b/st2actions/st2actions/runners/mistral/utils.py @@ -0,0 +1,156 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re + +import six +import yaml + +from st2common import log as logging +from st2common.models.system.common import ResourceReference +from st2common.persistence.action import Action + + +LOG = logging.getLogger(__name__) + +REGEX_ACTION = re.compile("^[\w\.]+[^=\s\"]*") +REGEX_ACTION_PARAMS = re.compile("([\w]+)=(\"[^\"]*\"\s*|'[^']*'\s*|" + "\{[^}]*\}\s*|\[.*\]\s*|[\.,:\w\d\.]+)") + + +def _parse_cmd_and_input(cmd_str): + cmd_matcher = REGEX_ACTION.search(cmd_str) + + if not cmd_matcher: + raise ValueError('Invalid action/workflow task property: %s' % cmd_str) + + cmd = cmd_matcher.group() + + params = {} + for k, v in re.findall(REGEX_ACTION_PARAMS, cmd_str): + v = v.strip() + + try: + v = json.loads(v) + except Exception: + pass + + params[k] = v + + return cmd, params + + +def _merge_dicts(left, right): + if left is None: + return right + + if right is None: + return left + + for k, v in right.iteritems(): + if k not in left: + left[k] = v + else: + left_v = left[k] + + if isinstance(left_v, dict) and isinstance(v, dict): + _merge_dicts(left_v, v) + + return left + + +def _eval_inline_params(spec, action_key, input_key): + action_str = spec.get(action_key) + command, inputs = _parse_cmd_and_input(action_str) + if inputs: + spec[action_key] = command + if input_key not in spec: + spec[input_key] = {} + _merge_dicts(spec[input_key], inputs) + + +def _transform_action(spec, action_key, input_key): + + if action_key not in spec or spec.get(action_key) == 'st2.action': + return + + if spec.get(action_key) == 'st2.callback': + raise Exception('st2.callback is deprecated.') + + # Convert parameters that are inline (i.e. action: some_action var1={$.value1} var2={$.value2}) + # and split it to action name and input dict as illustrated below. + # + # action: some_action + # input: + # var1: $.value1 + # var2: $.value2 + # + # This step to separate the action name and the input parameters is required + # to wrap them with the st2.action proxy. + # + # action: st2.action + # input: + # ref: some_action + # parameters: + # var1: $.value1 + # var2: $.value2 + _eval_inline_params(spec, action_key, input_key) + + action_ref = spec.get(action_key) + + if ResourceReference.is_resource_reference(action_ref): + ref = ResourceReference.from_string_reference(ref=action_ref) + actions = Action.query(name=ref.name, pack=ref.pack) + action = actions.first() if actions else None + else: + action = None + + if action: + spec[action_key] = 'st2.action' + spec[input_key] = { + 'ref': action_ref, + 'parameters': spec[input_key] + } + + +def transform_definition(definition): + spec = yaml.safe_load(definition) + + if 'version' not in spec: + raise Exception('Unknown version. Only version 2.0 is supported.') + + if spec['version'] != '2.0': + raise Exception('Only version 2.0 is supported.') + + # Transform adhoc actions + for action_name, action_spec in six.iteritems(spec.get('actions', {})): + _transform_action(action_spec, 'base', 'base-input') + + # Determine if definition is a workbook or workflow + is_workbook = 'workflows' in spec + + # Transform tasks + if is_workbook: + for workflow_name, workflow_spec in six.iteritems(spec.get('workflows', {})): + for task_name, task_spec in six.iteritems(workflow_spec.get('tasks')): + _transform_action(task_spec, 'action', 'input') + else: + for key, value in six.iteritems(spec): + if 'tasks' in value: + for task_name, task_spec in six.iteritems(value.get('tasks')): + _transform_action(task_spec, 'action', 'input') + + return yaml.safe_dump(spec, default_flow_style=False) diff --git a/st2actions/st2actions/runners/mistral/v2.py b/st2actions/st2actions/runners/mistral/v2.py index c183d73d9f..12efe29d13 100644 --- a/st2actions/st2actions/runners/mistral/v2.py +++ b/st2actions/st2actions/runners/mistral/v2.py @@ -20,6 +20,7 @@ from st2common.constants.action import ACTIONEXEC_STATUS_RUNNING from st2actions.runners import AsyncActionRunner +from st2actions.runners.mistral import utils from st2common import log as logging @@ -48,12 +49,13 @@ def run(self, action_parameters): workbook_name = self.action.pack + '.' + self.action.name with open(self.entry_point, 'r') as wbkfile: definition = wbkfile.read() + transformed_definition = utils.transform_definition(definition) try: wbk = client.workbooks.get(workbook_name) - if wbk.definition != definition: - client.workbooks.update(definition) + if wbk.definition != transformed_definition: + client.workbooks.update(transformed_definition) except: - client.workbooks.create(definition) + client.workbooks.create(transformed_definition) # Setup context for the workflow execution. context = self.runner_parameters.get('context', dict()) diff --git a/st2actions/tests/unit/test_mistral_dsl_transform.py b/st2actions/tests/unit/test_mistral_dsl_transform.py new file mode 100644 index 0000000000..db17f54737 --- /dev/null +++ b/st2actions/tests/unit/test_mistral_dsl_transform.py @@ -0,0 +1,107 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import yaml + +from st2tests import DbTestCase +from st2tests.fixturesloader import FixturesLoader +import st2actions.bootstrap.runnersregistrar as runners_registrar +from st2actions.runners.mistral import utils +from st2common.models.api.action import ActionAPI +from st2common.persistence.action import Action + + +WB_PRE_XFORM_FILE = 'wb_pre_xform.yaml' +WB_POST_XFORM_FILE = 'wb_post_xform.yaml' +WF_PRE_XFORM_FILE = 'wf_pre_xform.yaml' +WF_POST_XFORM_FILE = 'wf_post_xform.yaml' +TEST_FIXTURES = { + 'workflows': [ + WB_PRE_XFORM_FILE, + WB_POST_XFORM_FILE, + WF_PRE_XFORM_FILE, + WF_POST_XFORM_FILE + ], + 'actions': ['local.json'] +} + +PACK = 'generic' +LOADER = FixturesLoader() +FIXTURES = LOADER.load_fixtures(fixtures_pack=PACK, fixtures_dict=TEST_FIXTURES) +WB_PRE_XFORM_PATH = LOADER.get_fixture_file_path_abs(PACK, 'workflows', WB_PRE_XFORM_FILE) +WB_PRE_XFORM_DEF = FIXTURES['workflows'][WB_PRE_XFORM_FILE] +WB_POST_XFORM_PATH = LOADER.get_fixture_file_path_abs(PACK, 'workflows', WB_POST_XFORM_FILE) +WB_POST_XFORM_DEF = FIXTURES['workflows'][WB_POST_XFORM_FILE] +WF_PRE_XFORM_PATH = LOADER.get_fixture_file_path_abs(PACK, 'workflows', WF_PRE_XFORM_FILE) +WF_PRE_XFORM_DEF = FIXTURES['workflows'][WF_PRE_XFORM_FILE] +WF_POST_XFORM_PATH = LOADER.get_fixture_file_path_abs(PACK, 'workflows', WF_POST_XFORM_FILE) +WF_POST_XFORM_DEF = FIXTURES['workflows'][WF_POST_XFORM_FILE] + + +def _read_file_content(path): + with open(path, 'r') as f: + return f.read() + + +class DSLTransformTestCase(DbTestCase): + + @classmethod + def setUpClass(cls): + super(DSLTransformTestCase, cls).setUpClass() + runners_registrar.register_runner_types() + action_local = ActionAPI(**copy.deepcopy(FIXTURES['actions']['local.json'])) + Action.add_or_update(ActionAPI.to_model(action_local)) + + @staticmethod + def _read_file_content(path): + with open(path, 'r') as f: + return f.read() + + def test_invalid_dsl_version(self): + def_yaml = _read_file_content(WB_PRE_XFORM_PATH) + def_dict = yaml.safe_load(def_yaml) + + # Unsupported version + def_dict['version'] = '1.0' + def_yaml = yaml.safe_dump(def_dict) + self.assertRaises(Exception, utils.transform_definition, def_yaml) + + # Missing version + del def_dict['version'] + def_yaml = yaml.safe_dump(def_dict) + self.assertRaises(Exception, utils.transform_definition, def_yaml) + + def test_transform_workbook_dsl(self): + def_yaml = _read_file_content(WB_PRE_XFORM_PATH) + new_def = utils.transform_definition(def_yaml) + actual = yaml.safe_load(new_def) + expected = copy.deepcopy(WB_POST_XFORM_DEF) + self.assertDictEqual(actual, expected) + + def test_transform_workflow_dsl(self): + def_yaml = _read_file_content(WF_PRE_XFORM_PATH) + new_def = utils.transform_definition(def_yaml) + actual = yaml.safe_load(new_def) + expected = copy.deepcopy(WF_POST_XFORM_DEF) + self.assertDictEqual(actual, expected) + + def test_deprecated_callback_action(self): + def_yaml = _read_file_content(WB_PRE_XFORM_PATH) + def_dict = yaml.safe_load(def_yaml) + def_dict['workflows']['main']['tasks']['callback'] = {'action': 'st2.callback'} + def_yaml = yaml.safe_dump(def_dict) + self.assertRaises(Exception, utils.transform_definition, def_yaml) diff --git a/st2tests/st2tests/fixtures/generic/workflows/wb_post_xform.yaml b/st2tests/st2tests/fixtures/generic/workflows/wb_post_xform.yaml new file mode 100644 index 0000000000..4108ede435 --- /dev/null +++ b/st2tests/st2tests/fixtures/generic/workflows/wb_post_xform.yaml @@ -0,0 +1,81 @@ +version: "2.0" +name: "examples.mistral-complex" + +workflows: + + main: + type: direct + input: + - vm_name + - cpu_cores + - memory_mb + task-defaults: + on-error: + - fail + tasks: + create_vm: + workflow: create_vm + input: + name: $.vm_name + publish: + vm_id: $.vm_id + on-success: + - reconfig_vm + reconfig_vm: + workflow: reconfig_vm + input: + vm_id: $.vm_id + cpu_cores: $.cpu_cores + memory_mb: $.memory_mb + on-success: + - power_on_vm + power_on_vm: + action: st2.action + input: + ref: core.local + parameters: + cmd: "sleep 2; printf 'running'" + publish: + vm_state: $.stdout + + create_vm: + type: direct + input: + - name + output: + vm_id: $.vm_id + task-defaults: + on-error: + - fail + tasks: + create: + action: st2.action + input: + ref: core.local + parameters: + cmd: "sleep 3; printf 'vm1234'" + publish: + vm_id: $.stdout + + reconfig_vm: + type: direct + input: + - vm_id + - cpu_cores + - memory_mb + task-defaults: + on-error: + - fail + tasks: + add_disk: + action: st2.action + input: + ref: core.local + parameters: + cmd: "sleep 1; printf '{$.vm_id}'" + edit_cpu_mem: + action: st2.action + input: + ref: core.local + parameters: + cmd: "sleep 1; printf '{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}'" diff --git a/st2tests/st2tests/fixtures/generic/workflows/wb_pre_xform.yaml b/st2tests/st2tests/fixtures/generic/workflows/wb_pre_xform.yaml new file mode 100644 index 0000000000..3f55bd32fb --- /dev/null +++ b/st2tests/st2tests/fixtures/generic/workflows/wb_pre_xform.yaml @@ -0,0 +1,73 @@ +version: "2.0" +name: "examples.mistral-complex" + +workflows: + + main: + type: direct + input: + - vm_name + - cpu_cores + - memory_mb + task-defaults: + on-error: + - fail + tasks: + create_vm: + workflow: create_vm + input: + name: $.vm_name + publish: + vm_id: $.vm_id + on-success: + - reconfig_vm + reconfig_vm: + workflow: reconfig_vm + input: + vm_id: $.vm_id + cpu_cores: $.cpu_cores + memory_mb: $.memory_mb + on-success: + - power_on_vm + power_on_vm: + action: core.local + input: + cmd: "sleep 2; printf 'running'" + publish: + vm_state: $.stdout + + create_vm: + type: direct + input: + - name + output: + vm_id: $.vm_id + task-defaults: + on-error: + - fail + tasks: + create: + action: core.local + input: + cmd: "sleep 3; printf 'vm1234'" + publish: + vm_id: $.stdout + + reconfig_vm: + type: direct + input: + - vm_id + - cpu_cores + - memory_mb + task-defaults: + on-error: + - fail + tasks: + add_disk: + action: core.local + input: + cmd: "sleep 1; printf '{$.vm_id}'" + edit_cpu_mem: + action: core.local + input: + cmd: "sleep 1; printf '{\"vm_id\": \"{$.vm_id}\", \"cpu\": {$.cpu_cores}, \"memory\": {$.memory_mb}}'" diff --git a/st2tests/st2tests/fixtures/generic/workflows/wf_post_xform.yaml b/st2tests/st2tests/fixtures/generic/workflows/wf_post_xform.yaml new file mode 100644 index 0000000000..d97ed12a81 --- /dev/null +++ b/st2tests/st2tests/fixtures/generic/workflows/wf_post_xform.yaml @@ -0,0 +1,16 @@ +version: '2.0' + +demo: + type: direct + input: + - cmd + tasks: + run-cmd: + action: st2.action + input: + ref: core.local + parameters: + cmd: $.cmd + publish: + stdout: $.stdout + stderr: $.stderr diff --git a/st2tests/st2tests/fixtures/generic/workflows/wf_pre_xform.yaml b/st2tests/st2tests/fixtures/generic/workflows/wf_pre_xform.yaml new file mode 100644 index 0000000000..18639062bf --- /dev/null +++ b/st2tests/st2tests/fixtures/generic/workflows/wf_pre_xform.yaml @@ -0,0 +1,14 @@ +version: '2.0' + +demo: + type: direct + input: + - cmd + tasks: + run-cmd: + action: core.local + input: + cmd: $.cmd + publish: + stdout: $.stdout + stderr: $.stderr diff --git a/st2tests/st2tests/fixtures/generic/workflows/workflow-v1.yaml b/st2tests/st2tests/fixtures/generic/workflows/workflow-v1.yaml deleted file mode 100644 index e2ebd0eed4..0000000000 --- a/st2tests/st2tests/fixtures/generic/workflows/workflow-v1.yaml +++ /dev/null @@ -1,48 +0,0 @@ -Workflow: - tasks: - say-greeting: - action: st2.action - parameters: - name: "core.hey" - parameters: - cmd: $.count - publish: - greet: $.localhost.stdout - on-success: ["say-friend"] - on-error: ["callback_on_error"] - say-friend: - action: st2.action - parameters: - name: "core.friend" - parameters: - cmd: $.friend - publish: - towhom: $.localhost.stdout - on-success: ["greet-friend"] - on-error: ["callback_on_error"] - greet-friend: - action: st2.action - parameters: - name: "core.local" - parameters: - cmd: "echo \"{$.greet}, {$.towhom}\"" - publish: - message: $.localhost.stdout - on-success: ["callback_on_success"] - on-error: ["callback_on_error"] - callback_on_success: - action: st2.callback - retry: - count: 10 - delay: 60 - parameters: - state: "SUCCESS" - result: $.message - callback_on_error: - action: st2.callback - retry: - count: 10 - delay: 60 - parameters: - state: "ERROR" - result: "Unexpected error occurred." diff --git a/st2tests/st2tests/fixtures/generic/workflows/workflow-v2.yaml b/st2tests/st2tests/fixtures/generic/workflows/workflow-v2.yaml index 3d9533fbf1..5bb93c8f76 100644 --- a/st2tests/st2tests/fixtures/generic/workflows/workflow-v2.yaml +++ b/st2tests/st2tests/fixtures/generic/workflows/workflow-v2.yaml @@ -10,44 +10,16 @@ workflows: - friend tasks: say-greeting: - action: st2.action + action: core.hey input: - name: "core.hey" - parameters: - cmd: $.count + cmd: $.count publish: - greet: $.localhost.stdout - on-error: - - callback_on_error + greet: $.stdout on-success: - say-friend say-friend: - action: st2.action + action: core.friend input: - name: "core.friend" - parameters: - cmd: $.friend + cmd: $.friend publish: - towhom: $.localhost.stdout - on-error: - - callback_on_error - on-success: - - callback_on_success - callback_on_error: - action: st2.callback - input: - state: "ERROR" - result: "Unexpected failure occurred." - policies: - retry: - count: 10 - delay: 60 - callback_on_success: - action: st2.callback - input: - state: "SUCCESS" - result: "{$.greet}, {$.towhom}" - policies: - retry: - count: 10 - delay: 60 + towhom: $.stdout