diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1433fce714..4e4f6c0b07 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -41,26 +41,32 @@ jobs: # NOTE: We always want to run job on master since we run some additional checks there (code # coverage, etc) if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} - name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + name: '${{ matrix.name }} - Python ${{ matrix.python-version-short }}' runs-on: ubuntu-latest strategy: fail-fast: false matrix: # NOTE: To speed the CI run, we split unit and integration tests into multiple jobs where # each job runs subset of tests. + # NOTE: We need to use full Python version as part of Python deps cache key otherwise + # setup virtualenv step will fail. include: - name: 'Lint Checks (black, flake8, etc.)' task: 'ci-checks' - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Compile (pip deps, pylint, etc.)' task: 'ci-compile' - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Lint Checks (black, flake8, etc.)' task: 'ci-checks' - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' - name: 'Compile (pip deps, pylint, etc.)' task: 'ci-compile' - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' env: TASK: '${{ matrix.task }}' @@ -95,9 +101,9 @@ jobs: with: path: | ~/apt_cache - key: ${{ runner.os }}-apt-v5-${{ hashFiles('scripts/github/apt-packages.txt') }} + key: ${{ runner.os }}-apt-v7-${{ hashFiles('scripts/github/apt-packages.txt') }} restore-keys: | - ${{ runner.os }}-apt-v5- + ${{ runner.os }}-apt-v7- - name: Install APT Depedencies env: CACHE_HIT: ${{steps.cache-apt-deps.outputs.cache-hit}} @@ -130,7 +136,7 @@ jobs: # NOTE: We always want to run job on master since we run some additional checks there (code # coverage, etc) if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} - name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + name: '${{ matrix.name }} - Python ${{ matrix.python-version-short }}' runs-on: ubuntu-latest strategy: fail-fast: false @@ -142,26 +148,30 @@ jobs: task: 'ci-unit' nosetests_node_total: 2 nosetests_node_index: 0 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Unit Tests (chunk 2)' task: 'ci-unit' nosetests_node_total: 2 nosetests_node_index: 1 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Unit Tests (chunk 1)' task: 'ci-unit' nosetests_node_total: 2 nosetests_node_index: 0 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' - name: 'Unit Tests (chunk 2)' task: 'ci-unit' nosetests_node_total: 2 nosetests_node_index: 1 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' # This job is slow so we only run in on a daily basis # - name: 'Micro Benchmarks' # task: 'micro-benchmarks' - # python-version: '3.6' + # python-version: '3.6.13' # nosetests_node_total: 1 # nosetests_node_ index: 0 services: @@ -295,7 +305,7 @@ jobs: # NOTE: We always want to run job on master since we run some additional checks there (code # coverage, etc) if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} - name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + name: '${{ matrix.name }} - Python ${{ matrix.python-version-short }}' runs-on: ubuntu-latest strategy: fail-fast: false @@ -309,32 +319,38 @@ jobs: task: 'ci-packs-tests' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Integration Tests (chunk 1)' task: 'ci-integration' nosetests_node_total: 2 nosetests_node_index: 0 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Integration Tests (chunk 2)' task: 'ci-integration' nosetests_node_total: 2 nosetests_node_index: 1 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Pack Tests' task: 'ci-packs-tests' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' - name: 'Integration Tests (chunk 1)' task: 'ci-integration' nosetests_node_total: 2 nosetests_node_index: 0 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' - name: 'Integration Tests (chunk 2)' task: 'ci-integration' nosetests_node_total: 2 nosetests_node_index: 1 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' services: mongo: image: mongo:4.4 diff --git a/.github/workflows/microbenchmarks.yaml b/.github/workflows/microbenchmarks.yaml index 72d050d38b..7480c13b3a 100644 --- a/.github/workflows/microbenchmarks.yaml +++ b/.github/workflows/microbenchmarks.yaml @@ -26,22 +26,26 @@ jobs: # NOTE: We always want to run job on master since we run some additional checks there (code # coverage, etc) if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} - name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + name: '${{ matrix.name }} - Python ${{ matrix.python-version-short }}' runs-on: ubuntu-latest strategy: fail-fast: false matrix: + # NOTE: We need to use full Python version as part of Python deps cache key otherwise + # setup virtualenv step will fail. include: - name: 'Microbenchmarks' task: 'micro-benchmarks' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.6' + python-version-short: '3.6' + python-version: '3.6.13' - name: 'Microbenchmarks' task: 'micro-benchmarks' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' services: mongo: image: mongo:4.4 @@ -91,9 +95,9 @@ jobs: with: path: | ~/apt_cache - key: ${{ runner.os }}-apt-v5-${{ hashFiles('scripts/github/apt-packages.txt') }} + key: ${{ runner.os }}-apt-v7-${{ hashFiles('scripts/github/apt-packages.txt') }} restore-keys: | - ${{ runner.os }}-apt-v5- + ${{ runner.os }}-apt-v7- - name: Install APT Dependencies env: CACHE_HIT: ${{steps.cache-apt-deps.outputs.cache-hit}} diff --git a/.github/workflows/orquesta-integration-tests.yaml b/.github/workflows/orquesta-integration-tests.yaml index 36782b2199..98159afe8f 100644 --- a/.github/workflows/orquesta-integration-tests.yaml +++ b/.github/workflows/orquesta-integration-tests.yaml @@ -44,22 +44,26 @@ jobs: # NOTE: We always want to run job on master since we run some additional checks there (code # coverage, etc) if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} - name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + name: '${{ matrix.name }} - Python ${{ matrix.python-version-short }}' runs-on: ubuntu-latest strategy: fail-fast: false matrix: + # NOTE: We need to use full Python version as part of Python deps cache key otherwise + # setup virtualenv step will fail. include: - name: 'Integration Tests (Orquesta)' task: 'ci-orquesta' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.6' + python-version: '3.6.13' + python-version-short: '3.6' - name: 'Integration Tests (Orquesta)' task: 'ci-orquesta' nosetests_node_total: 1 nosetests_node_index: 0 - python-version: '3.8' + python-version-short: '3.8' + python-version: '3.8.10' services: mongo: image: mongo:4.4 @@ -144,9 +148,9 @@ jobs: with: path: | ~/apt_cache - key: ${{ runner.os }}-apt-v5-${{ hashFiles('scripts/github/apt-packages.txt') }} + key: ${{ runner.os }}-apt-v7-${{ hashFiles('scripts/github/apt-packages.txt') }} restore-keys: | - ${{ runner.os }}-apt-v5- + ${{ runner.os }}-apt-v7- - name: Install APT Depedencies env: CACHE_HIT: ${{steps.cache-apt-deps.outputs.cache-hit}} diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4232e28ffc..6b87edfce8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -167,7 +167,11 @@ Changed triggers with larger payloads. This should address a long standing issue where StackStorm was reported to be slow and CPU - inefficient with handling large executions. (improvement) #4846 + inefficient with handling large executions. + + If you want to migrate existing database objects to utilize the new type, you can use + ``st2common/bin/migrations/v3.5/st2-migrate-db-dict-field-values`` migration + script. (improvement) #4846 Contributed by @Kami. diff --git a/scripts/github/install-apt-packages-use-cache.sh b/scripts/github/install-apt-packages-use-cache.sh index 66c7e5cba5..38a77fc84e 100755 --- a/scripts/github/install-apt-packages-use-cache.sh +++ b/scripts/github/install-apt-packages-use-cache.sh @@ -17,6 +17,14 @@ echo "" echo "CACHE_HIT=${CACHE_HIT}" echo "" +# TODO: Recently using cached dependency started failing so I (@Kami) temporary disabled cache. +# We should investigate why it's failing and try to fix it. +sudo apt-get -y update +# shellcheck disable=SC2086 +sudo apt-get -f -y --reinstall install ${APT_PACKAGES} +sudo dpkg -l +exit 0 + # Directory where installed package files will be copied - should match directory specified for # cache target in github actions workflow CACHE_DIRECTORY="${HOME}/apt_cache" @@ -38,7 +46,7 @@ if [[ "$CACHE_HIT" != 'true' ]]; then fi # shellcheck disable=SC2086 -sudo apt-get -f -y install ${APT_PACKAGES} +sudo apt-get -f -y --reinstall install ${APT_PACKAGES} ls -la "${APT_STATE_LISTS}" ls -la "${APT_CACHE_ARCHIVES}" diff --git a/st2common/bin/migrations/v3.5/__init__.py b/st2common/bin/migrations/v3.5/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/st2common/bin/migrations/v3.5/st2-migrate-db-dict-field-values b/st2common/bin/migrations/v3.5/st2-migrate-db-dict-field-values new file mode 100755 index 0000000000..0f82738809 --- /dev/null +++ b/st2common/bin/migrations/v3.5/st2-migrate-db-dict-field-values @@ -0,0 +1,455 @@ +#!/usr/bin/env python +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Migration which which migrates data for existing objects in the database which utilize +EscapedDictField or EscapedDynamicField and have been updated to use new JsonDictField. + +Migration step is idempotent and can be retried on failures / partial runs. + +Keep in mind that running this migration script is optional and it may take a long time of you have +a lot of very large objects in the database (aka executions) - reading a lot of data from the +database using the old field types is slow and CPU intensive. + +New field type is automatically used for all the new objects when upgrading to v3.5 so migration is +optional because in most cases users are viewing recent / new executions and not old ones which may +still utilize old field typo which is slow to read / write. + +Right now the script utilizes no concurrency and performs migration one object by one. That's done +for simplicity reasons and also to avoid massive CPU usage spikes when running this script with +large concurrency on large objects. + +Keep in mind that only "completed" objects are processes - this means Executions in "final" states +(succeeded, failed, timeout, etc.). + +We determine if an object should be migrating using mongodb $type query (for execution objects we +could also determine that based on the presence of result_size field). +""" + +import sys +import time +import datetime +import traceback + +from oslo_config import cfg + +from st2common import config +from st2common.service_setup import db_setup +from st2common.service_setup import db_teardown +from st2common.util import isotime +from st2common.models.db.execution import ActionExecutionDB +from st2common.models.db.workflow import WorkflowExecutionDB +from st2common.models.db.workflow import TaskExecutionDB +from st2common.models.db.trigger import TriggerInstanceDB +from st2common.persistence.execution import ActionExecution +from st2common.persistence.liveaction import LiveAction +from st2common.persistence.workflow import WorkflowExecution +from st2common.persistence.workflow import TaskExecution +from st2common.persistence.trigger import TriggerInstance +from st2common.exceptions.db import StackStormDBObjectNotFoundError +from st2common.constants.action import LIVEACTION_COMPLETED_STATES +from st2common.constants.triggers import TRIGGER_INSTANCE_COMPLETED_STATES + +# NOTE: To avoid unnecessary mongoengine object churn when retrieving only object ids (aka to avoid +# instantiating model class with a single field), we use raw pymongo value which is a dict with a +# single value + + +def migrate_executions(start_dt: datetime.datetime, end_dt: datetime.datetime) -> None: + """ + Perform migrations for execution related objects (ActionExecutionDB, LiveActionDB). + """ + print("Migrating execution objects") + + # NOTE: We first only retrieve the IDs because there could be a lot of objects in the database + # and this could result in massive ram use. Technically, mongoengine loads querysets lazily, + # but this is not always the case so it's better to first retrieve all the IDs and then retrieve + # objects one by one. + # Keep in mind we need to use ModelClass.objects and not PersistanceClass.query() so .only() + # works correctly - with PersistanceClass.query().only() all the fields will still be retrieved. + + # 1. Migrate ActionExecutionDB objects + result = ( + ActionExecutionDB.objects( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + }, + "status": { + "$in": LIVEACTION_COMPLETED_STATES, + }, + }, + start_timestamp__gte=start_dt, + start_timestamp__lte=end_dt, + ) + .only("id") + .as_pymongo() + ) + execution_ids = set([str(item["_id"]) for item in result]) + objects_count = result.count() + + if not execution_ids: + print("Found no ActionExecutionDB objects to migrate.") + print("") + return None + + print("Will migrate %s ActionExecutionDB objects" % (objects_count)) + print("") + + for index, execution_id in enumerate(execution_ids, 1): + try: + execution_db = ActionExecution.get_by_id(execution_id) + except StackStormDBObjectNotFoundError: + print( + "Skipping ActionExecutionDB with id %s which is missing in the database" + % (execution_id) + ) + continue + + print( + "[%s/%s] Migrating ActionExecutionDB with id %s" + % (index, objects_count, execution_id) + ) + + # This is a bit of a "hack", but it's the easiest way to tell mongoengine that a specific + # field has been updated and should be saved. If we don't do, nothing will be re-saved on + # .save() call due to mongoengine only trying to save what has changed to make it more + # efficient instead of always re-saving the whole object. + execution_db._mark_as_changed("result") + execution_db._mark_as_changed("result_size") + + # We need to explicitly set result_size attribute since Document.save() code path doesn't + # populate it (but other code paths we utilize elsewhere do). + # Technically we could do it on document clean() / validate() method, but we don't want that + # since execution update code in action runner and elsewhere is optimized to make partial + # updates more efficient. + result_size = len( + ActionExecutionDB.result._serialize_field_value(execution_db.result or {}) + ) + execution_db.result_size = result_size + + # NOTE: If you want to view changed fields, you can access execution_db._changed_fields + + execution_db.save() + print("ActionExecutionDB with id %s has been migrated" % (execution_db.id)) + + # Migrate corresponding LiveAction object + liveaction = execution_db.liveaction or {} + liveaction_id = liveaction.get("id", None) + + if not liveaction_id: + continue + + try: + liveaction_db = LiveAction.get_by_id(liveaction_id) + except StackStormDBObjectNotFoundError: + # If liveaction for some reason doesn't exist (would likely represent corrupted data) we + # simply ignore that error since it's not fatal. + print( + "Skipping LiveActionDB with id %s which is missing in the database" + % (liveaction_db) + ) + continue + + liveaction_db._mark_as_changed("result") + + liveaction_db.save() + print("Related LiveActionDB with id %s has been migrated" % (liveaction_db.id)) + print("") + + +def migrate_workflow_objects( + start_dt: datetime.datetime, end_dt: datetime.datetime +) -> None: + print("Migrating workflow objects") + + # 1. Migrate WorkflowExecutionDB + result = ( + WorkflowExecutionDB.objects( + __raw__={ + "output": { + "$not": { + "$type": "binData", + }, + }, + "status": { + "$in": LIVEACTION_COMPLETED_STATES, + }, + }, + start_timestamp__gte=start_dt, + start_timestamp__lte=end_dt, + ) + .only("id") + .as_pymongo() + ) + workflow_execution_ids = [str(item["_id"]) for item in result] + objects_count = result.count() + + if not workflow_execution_ids: + print("Found no WorkflowExecutionDB objects to migrate.") + print("") + else: + print("Will migrate %s WorkflowExecutionDB objects" % (objects_count)) + print("") + + for index, workflow_execution_id in enumerate(workflow_execution_ids, 1): + try: + workflow_execution_db = WorkflowExecution.get_by_id(workflow_execution_id) + except StackStormDBObjectNotFoundError: + print( + "Skipping WorkflowExecutionDB with id %s which is missing in the database" + % (workflow_execution_id) + ) + continue + + print( + "[%s/%s] Migrating WorkflowExecutionDB with id %s" + % (index, objects_count, workflow_execution_id) + ) + + workflow_execution_db._mark_as_changed("input") + workflow_execution_db._mark_as_changed("context") + workflow_execution_db._mark_as_changed("state") + workflow_execution_db._mark_as_changed("output") + + workflow_execution_db.save() + print( + "WorkflowExecutionDB with id %s has been migrated" + % (workflow_execution_db.id) + ) + print("") + + # 2. Migrate TaskExecutionDB + result = ( + TaskExecutionDB.objects( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + }, + "status": { + "$in": LIVEACTION_COMPLETED_STATES, + }, + }, + start_timestamp__gte=start_dt, + start_timestamp__lte=end_dt, + ) + .only("id") + .as_pymongo() + ) + task_execution_ids = [str(item["_id"]) for item in result] + objects_count = result.count() + + if not task_execution_ids: + print("Found no TaskExecutionDB objects to migrate.") + print("") + else: + print("Will migrate %s TaskExecutionDB objects" % (objects_count)) + print("") + + for index, task_execution_id in enumerate(task_execution_ids, 1): + try: + task_execution_db = TaskExecution.get_by_id(task_execution_id) + except StackStormDBObjectNotFoundError: + print( + "Skipping TaskExecutionDB with id %s which is missing in the database" + % (task_execution_db) + ) + continue + + print( + "[%s/%s] Migrating TaskExecutionDB with id %s" + % (index, objects_count, task_execution_id) + ) + + task_execution_db._mark_as_changed("task_spec") + task_execution_db._mark_as_changed("context") + task_execution_db._mark_as_changed("result") + + task_execution_db.save() + print("TaskExecutionDB with id %s has been migrated" % (task_execution_db.id)) + print("") + + +def migrate_triggers(start_dt: datetime.datetime, end_dt: datetime.datetime) -> None: + print("Migrating trigger objects") + + result = ( + TriggerInstanceDB.objects( + __raw__={ + "payload": { + "$not": { + "$type": "binData", + }, + }, + "status": { + "$in": TRIGGER_INSTANCE_COMPLETED_STATES, + }, + }, + occurrence_time__gte=start_dt, + occurrence_time__lte=end_dt, + ) + .only("id") + .as_pymongo() + ) + trigger_instance_ids = [str(item["_id"]) for item in result] + objects_count = result.count() + + if not trigger_instance_ids: + print("Found no TriggerInstanceDB objects to migrate.") + print("") + return None + + print("Will migrate %s TriggerInstanceDB objects" % (objects_count)) + print("") + + for index, trigger_instance_id in enumerate(trigger_instance_ids, 1): + try: + trigger_instance_db = TriggerInstance.get_by_id(trigger_instance_id) + except StackStormDBObjectNotFoundError: + print( + "Skipping TriggerInstanceDB with id %s which is missing in the database" + % (trigger_instance_id) + ) + continue + + print( + "[%s/%s] Migrating TriggerInstanceDB with id %s" + % (index, objects_count, trigger_instance_id) + ) + + trigger_instance_db._mark_as_changed("payload") + + trigger_instance_db.save() + print( + "TriggerInstanceDB with id %s has been migrated" % (trigger_instance_db.id) + ) + print("") + + +def migrate_objects( + start_dt: datetime.datetime, end_dt: datetime.datetime, display_prompt: bool = True +) -> None: + start_dt_str = start_dt.strftime("%Y-%m-%d %H:%M:%S") + end_dt_str = end_dt.strftime("%Y-%m-%d %H:%M:%S") + + print("StackStorm v3.5 database field data migration script\n") + + if display_prompt: + input( + "Will migrate objects with creation date between %s UTC and %s UTC.\n\n" + "You are strongly recommended to create database backup before proceeding.\n\n" + "Depending on the number of the objects in the database, " + "migration may take multiple hours or more. You are recommended to start the " + "script in a screen session, tmux or similar. \n\n" + "To proceed with the migration, press enter and to cancel it, press CTRL+C.\n" + % (start_dt_str, end_dt_str) + ) + print("") + + print( + "Migrating affected database objects between %s and %s" + % (start_dt_str, end_dt_str) + ) + print("") + + start_ts = int(time.time()) + migrate_executions(start_dt=start_dt, end_dt=end_dt) + migrate_workflow_objects(start_dt=start_dt, end_dt=end_dt) + migrate_triggers(start_dt=start_dt, end_dt=end_dt) + end_ts = int(time.time()) + + duration = end_ts - start_ts + + print( + "SUCCESS: All database objects migrated successfully (duration: %s seconds)." + % (duration) + ) + + +def _register_cli_opts(): + cfg.CONF.register_cli_opt( + cfg.BoolOpt( + "yes", + short="y", + required=False, + default=False, + ) + ) + + # We default to past 30 days. Keep in mind that using longer period may take a long time in + # case there are many objects in the database. + now_dt = datetime.datetime.utcnow() + start_dt = now_dt - datetime.timedelta(days=30) + + cfg.CONF.register_cli_opt( + cfg.StrOpt( + "start-dt", + required=False, + help=( + "Start cut off ISO UTC iso date time string for objects which will be migrated. " + "Defaults to now - 30 days." + "Example value: 2020-03-13T19:01:27Z" + ), + default=start_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), + ) + ) + cfg.CONF.register_cli_opt( + cfg.StrOpt( + "end-dt", + required=False, + help=( + "End cut off UTC ISO date time string for objects which will be migrated." + "Defaults to now." + "Example value: 2020-03-13T19:01:27Z" + ), + default=now_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), + ) + ) + + +def main(): + _register_cli_opts() + + config.parse_args() + db_setup() + + start_dt = isotime.parse(cfg.CONF.start_dt) + + if cfg.CONF.end_dt == "now": + end_dt = datetime.datetime.utcnow() + end_dt = end_dt.replace(tzinfo=datetime.timezone.utc) + else: + end_dt = isotime.parse(cfg.CONF.end_dt) + + try: + migrate_objects( + start_dt=start_dt, end_dt=end_dt, display_prompt=not cfg.CONF.yes + ) + exit_code = 0 + except Exception as e: + print("ABORTED: Objects migration aborted on first failure: %s" % (str(e))) + traceback.print_exc() + exit_code = 1 + + db_teardown() + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/st2common/bin/migrations/v3.5/st2_migrate_db_dict_field_values.py b/st2common/bin/migrations/v3.5/st2_migrate_db_dict_field_values.py new file mode 120000 index 0000000000..495261f46c --- /dev/null +++ b/st2common/bin/migrations/v3.5/st2_migrate_db_dict_field_values.py @@ -0,0 +1 @@ +st2-migrate-db-dict-field-values \ No newline at end of file diff --git a/st2common/st2common/constants/triggers.py b/st2common/st2common/constants/triggers.py index 14ab861fd5..4751ec372e 100644 --- a/st2common/st2common/constants/triggers.py +++ b/st2common/st2common/constants/triggers.py @@ -41,6 +41,7 @@ "TRIGGER_INSTANCE_PROCESSING", "TRIGGER_INSTANCE_PROCESSED", "TRIGGER_INSTANCE_PROCESSING_FAILED", + "TRIGGER_INSTANCE_COMPLETED_STATES", ] # Action resource triggers @@ -351,3 +352,9 @@ TRIGGER_INSTANCE_PROCESSED, TRIGGER_INSTANCE_PROCESSING_FAILED, ] + + +TRIGGER_INSTANCE_COMPLETED_STATES = [ + TRIGGER_INSTANCE_PROCESSED, + TRIGGER_INSTANCE_PROCESSING_FAILED, +] diff --git a/st2common/tests/unit/migrations/test_v35_migrate_db_dict_field_values.py b/st2common/tests/unit/migrations/test_v35_migrate_db_dict_field_values.py new file mode 100644 index 0000000000..84347fcbab --- /dev/null +++ b/st2common/tests/unit/migrations/test_v35_migrate_db_dict_field_values.py @@ -0,0 +1,607 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import datetime + +from st2common.constants import action as action_constants +from st2common.models.db import stormbase +from st2common.models.db.execution import ActionExecutionDB +from st2common.models.db.liveaction import LiveActionDB +from st2common.models.db.workflow import WorkflowExecutionDB +from st2common.models.db.workflow import TaskExecutionDB +from st2common.models.db.trigger import TriggerInstanceDB +from st2common.persistence.execution import ActionExecution +from st2common.persistence.liveaction import LiveAction +from st2common.persistence.workflow import WorkflowExecution +from st2common.persistence.workflow import TaskExecution +from st2common.persistence.trigger import TriggerInstance +from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED +from st2common.constants.triggers import TRIGGER_INSTANCE_PENDING + +from st2tests import DbTestCase + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append( + os.path.abspath(os.path.join(BASE_DIR, "../../../bin/migrations/v3.5/")) +) + +import st2_migrate_db_dict_field_values as migration_module + + +MOCK_RESULT_1 = { + "foo": "bar1", + "bar": 1, + "baz": None, +} + +MOCK_RESULT_2 = { + "foo": "bar2", + "bar": 2, + "baz": False, +} + +MOCK_PAYLOAD_1 = {"yaaaas": "le payload!"} + +MOCK_PAYLOAD_2 = {"yaaaas": "le payload! 2"} + +# NOTE: We define those classes and set allow_inheritance inside the methods so importing this +# module doesn't have side affect and break / affect other tests + + +class DBFieldsMigrationScriptTestCase(DbTestCase): + @classmethod + def tearDownClass(cls): + ActionExecutionDB._meta["allow_inheritance"] = False + LiveActionDB._meta["allow_inheritance"] = False + WorkflowExecutionDB._meta["allow_inheritance"] = False + TaskExecutionDB._meta["allow_inheritance"] = False + TriggerInstanceDB._meta["allow_inheritance"] = False + + def test_migrate_executions_related_liveaction_doesnt_exist(self): + pass + + def test_migrate_executions(self): + ActionExecutionDB._meta["allow_inheritance"] = True + LiveActionDB._meta["allow_inheritance"] = True + + class ActionExecutionDB_OldFieldType(ActionExecutionDB): + result = stormbase.EscapedDynamicField(default={}) + + class LiveActionDB_OldFieldType(LiveActionDB): + result = stormbase.EscapedDynamicField(default={}) + + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(execution_dbs), 0) + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(execution_dbs), 0) + + # 1. Insert data in old format + liveaction_1_db = LiveActionDB_OldFieldType() + liveaction_1_db.action = "foo.bar" + liveaction_1_db.status = action_constants.LIVEACTION_STATUS_FAILED + liveaction_1_db.result = MOCK_RESULT_1 + liveaction_1_db.start_timestamp = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) + liveaction_1_db = LiveAction.add_or_update(liveaction_1_db, publish=False) + + execution_1_db = ActionExecutionDB_OldFieldType() + execution_1_db.action = {"a": 1} + execution_1_db.runner = {"a": 1} + execution_1_db.liveaction = {"id": liveaction_1_db.id} + execution_1_db.status = action_constants.LIVEACTION_STATUS_FAILED + execution_1_db.result = MOCK_RESULT_1 + execution_1_db.start_timestamp = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) + + execution_1_db = ActionExecution.add_or_update(execution_1_db, publish=False) + + # This execution is not in a final state yet so it should not be migrated + liveaction_2_db = LiveActionDB_OldFieldType() + liveaction_2_db.action = "foo.bar2" + liveaction_2_db.status = action_constants.LIVEACTION_STATUS_RUNNING + liveaction_2_db.result = MOCK_RESULT_2 + liveaction_2_db.start_timestamp = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) + + liveaction_2_db = LiveAction.add_or_update(liveaction_2_db, publish=False) + + execution_2_db = ActionExecutionDB_OldFieldType() + execution_2_db.action = {"a": 2} + execution_2_db.runner = {"a": 2} + execution_2_db.liveaction = {"id": liveaction_2_db.id} + execution_2_db.status = action_constants.LIVEACTION_STATUS_RUNNING + execution_2_db.result = MOCK_RESULT_2 + execution_2_db.start_timestamp = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) + + execution_2_db = ActionExecution.add_or_update(execution_2_db, publish=False) + + # This object is older than the default threshold so it should not be migrated + execution_3_db = ActionExecutionDB_OldFieldType() + execution_3_db.action = {"a": 2} + execution_3_db.runner = {"a": 2} + execution_3_db.liveaction = {"id": liveaction_2_db.id} + execution_3_db.status = action_constants.LIVEACTION_STATUS_SUCCEEDED + execution_3_db.result = MOCK_RESULT_1 + execution_3_db.start_timestamp = datetime.datetime.utcfromtimestamp(0).replace( + tzinfo=datetime.timezone.utc + ) + + execution_3_db = ActionExecution.add_or_update(execution_3_db, publish=False) + + # Verify data has been inserted in old format + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(execution_dbs), 3) + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(execution_dbs), 3) + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$type": "binData", + }, + } + ) + self.assertEqual(len(execution_dbs), 0) + + liveaction_dbs = LiveAction.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(liveaction_dbs), 2) + liveaction_dbs = LiveAction.query( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(liveaction_dbs), 2) + liveaction_dbs = LiveAction.query( + __raw__={ + "result": { + "$type": "binData", + }, + } + ) + self.assertEqual(len(liveaction_dbs), 0) + + # Update inserted documents and remove special _cls field added by mongoengine. We need to + # do that here due to how mongoengine works with subclasses. + ActionExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ).update(set___cls="ActionExecutionDB") + + LiveAction.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ).update(set___cls="LiveActionDB") + + # 2. Run migration + start_dt = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) - datetime.timedelta(hours=2) + end_dt = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + migration_module.migrate_executions(start_dt=start_dt, end_dt=end_dt) + + # 3. Verify data has been migrated - only 1 item should have been migrated since it's in a + # completed state + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(execution_dbs), 2) + execution_dbs = ActionExecution.query( + __raw__={ + "result": { + "$type": "binData", + }, + } + ) + self.assertEqual(len(execution_dbs), 1) + + execution_db_1_retrieved = ActionExecution.get_by_id(execution_1_db.id) + self.assertEqual(execution_db_1_retrieved.result, MOCK_RESULT_1) + + execution_db_2_retrieved = ActionExecution.get_by_id(execution_2_db.id) + self.assertEqual(execution_db_2_retrieved.result, MOCK_RESULT_2) + + liveaction_db_1_retrieved = LiveAction.get_by_id(liveaction_1_db.id) + self.assertEqual(liveaction_db_1_retrieved.result, MOCK_RESULT_1) + + liveaction_db_2_retrieved = LiveAction.get_by_id(liveaction_2_db.id) + self.assertEqual(liveaction_db_2_retrieved.result, MOCK_RESULT_2) + + def test_migrate_workflows(self): + WorkflowExecutionDB._meta["allow_inheritance"] = True + TaskExecutionDB._meta["allow_inheritance"] = True + + class WorkflowExecutionDB_OldFieldType(WorkflowExecutionDB): + input = stormbase.EscapedDictField() + context = stormbase.EscapedDictField() + state = stormbase.EscapedDictField() + output = stormbase.EscapedDictField() + + class TaskExecutionDB_OldFieldType(TaskExecutionDB): + task_spec = stormbase.EscapedDictField() + context = stormbase.EscapedDictField() + result = stormbase.EscapedDictField() + + workflow_execution_dbs = WorkflowExecution.query( + __raw__={ + "output": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(workflow_execution_dbs), 0) + workflow_execution_dbs = WorkflowExecution.query( + __raw__={ + "output": { + "$type": "object", + }, + } + ) + self.assertEqual(len(workflow_execution_dbs), 0) + + task_execution_dbs = TaskExecution.query( + __raw__={ + "result": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(task_execution_dbs), 0) + task_execution_dbs = TaskExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(task_execution_dbs), 0) + + # 1. Insert data in old format + workflow_execution_1_db = WorkflowExecutionDB_OldFieldType() + workflow_execution_1_db.input = MOCK_RESULT_1 + workflow_execution_1_db.context = MOCK_RESULT_1 + workflow_execution_1_db.state = MOCK_RESULT_1 + workflow_execution_1_db.output = MOCK_RESULT_1 + workflow_execution_1_db.status = action_constants.LIVEACTION_STATUS_SUCCEEDED + workflow_execution_1_db.action_execution = "a" + workflow_execution_1_db = WorkflowExecution.add_or_update( + workflow_execution_1_db, publish=False + ) + + task_execution_1_db = TaskExecutionDB_OldFieldType() + task_execution_1_db.task_spec = MOCK_RESULT_1 + task_execution_1_db.context = MOCK_RESULT_1 + task_execution_1_db.result = MOCK_RESULT_1 + task_execution_1_db.status = action_constants.LIVEACTION_STATUS_SUCCEEDED + task_execution_1_db.workflow_execution = "a" + task_execution_1_db.task_name = "a" + task_execution_1_db.task_id = "a" + task_execution_1_db.task_route = 1 + task_execution_1_db = TaskExecution.add_or_update( + task_execution_1_db, publish=False + ) + + workflow_execution_2_db = WorkflowExecutionDB_OldFieldType() + workflow_execution_2_db.input = MOCK_RESULT_2 + workflow_execution_2_db.context = MOCK_RESULT_2 + workflow_execution_2_db.state = MOCK_RESULT_2 + workflow_execution_2_db.output = MOCK_RESULT_2 + workflow_execution_2_db.status = action_constants.LIVEACTION_STATUS_RUNNING + workflow_execution_2_db.action_execution = "b" + workflow_execution_2_db = WorkflowExecution.add_or_update( + workflow_execution_2_db, publish=False + ) + + task_execution_2_db = TaskExecutionDB_OldFieldType() + task_execution_2_db.task_spec = MOCK_RESULT_2 + task_execution_2_db.context = MOCK_RESULT_2 + task_execution_2_db.result = MOCK_RESULT_2 + task_execution_2_db.status = action_constants.LIVEACTION_STATUS_RUNNING + task_execution_2_db.workflow_execution = "b" + task_execution_2_db.task_name = "b" + task_execution_2_db.task_id = "b" + task_execution_2_db.task_route = 2 + task_execution_2_db = TaskExecution.add_or_update( + task_execution_2_db, publish=False + ) + + # This object is older than the default threshold so it should not be migrated + workflow_execution_3_db = WorkflowExecutionDB_OldFieldType() + workflow_execution_3_db.input = MOCK_RESULT_2 + workflow_execution_3_db.context = MOCK_RESULT_2 + workflow_execution_3_db.state = MOCK_RESULT_2 + workflow_execution_3_db.output = MOCK_RESULT_2 + workflow_execution_3_db.status = action_constants.LIVEACTION_STATUS_SUCCEEDED + workflow_execution_3_db.action_execution = "b" + workflow_execution_3_db.start_timestamp = datetime.datetime.utcfromtimestamp( + 0 + ).replace(tzinfo=datetime.timezone.utc) + workflow_execution_3_db = WorkflowExecution.add_or_update( + workflow_execution_3_db, publish=False + ) + + task_execution_3_db = TaskExecutionDB_OldFieldType() + task_execution_3_db.task_spec = MOCK_RESULT_2 + task_execution_3_db.context = MOCK_RESULT_2 + task_execution_3_db.result = MOCK_RESULT_2 + task_execution_3_db.status = action_constants.LIVEACTION_STATUS_SUCCEEDED + task_execution_3_db.workflow_execution = "b" + task_execution_3_db.task_name = "b" + task_execution_3_db.task_id = "b" + task_execution_3_db.task_route = 2 + task_execution_3_db.start_timestamp = datetime.datetime.utcfromtimestamp( + 0 + ).replace(tzinfo=datetime.timezone.utc) + task_execution_3_db = TaskExecution.add_or_update( + task_execution_3_db, publish=False + ) + + # Update inserted documents and remove special _cls field added by mongoengine. We need to + # do that here due to how mongoengine works with subclasses. + WorkflowExecution.query( + __raw__={ + "input": { + "$type": "object", + }, + } + ).update(set___cls="WorkflowExecutionDB") + + TaskExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ).update(set___cls="TaskExecutionDB") + + # 2. Run migration + start_dt = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) - datetime.timedelta(hours=2) + end_dt = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + migration_module.migrate_workflow_objects(start_dt=start_dt, end_dt=end_dt) + + # 3. Verify data has been migrated - only 1 item should have been migrated since it's in a + # completed state + workflow_execution_dbs = WorkflowExecution.query( + __raw__={ + "output": { + "$type": "binData", + }, + } + ) + self.assertEqual(len(workflow_execution_dbs), 1) + workflow_execution_dbs = WorkflowExecution.query( + __raw__={ + "output": { + "$type": "object", + }, + } + ) + self.assertEqual(len(workflow_execution_dbs), 2) + + task_execution_dbs = TaskExecution.query( + __raw__={ + "result": { + "$type": "binData", + }, + } + ) + self.assertEqual(len(task_execution_dbs), 1) + task_execution_dbs = TaskExecution.query( + __raw__={ + "result": { + "$type": "object", + }, + } + ) + self.assertEqual(len(task_execution_dbs), 2) + + workflow_execution_1_db_retrieved = WorkflowExecution.get_by_id( + workflow_execution_1_db.id + ) + self.assertEqual(workflow_execution_1_db_retrieved.input, MOCK_RESULT_1) + self.assertEqual(workflow_execution_1_db_retrieved.context, MOCK_RESULT_1) + self.assertEqual(workflow_execution_1_db_retrieved.state, MOCK_RESULT_1) + self.assertEqual(workflow_execution_1_db_retrieved.output, MOCK_RESULT_1) + + workflow_execution_2_db_retrieved = WorkflowExecution.get_by_id( + workflow_execution_2_db.id + ) + self.assertEqual(workflow_execution_2_db_retrieved.input, MOCK_RESULT_2) + self.assertEqual(workflow_execution_2_db_retrieved.context, MOCK_RESULT_2) + self.assertEqual(workflow_execution_2_db_retrieved.state, MOCK_RESULT_2) + self.assertEqual(workflow_execution_2_db_retrieved.output, MOCK_RESULT_2) + + def test_migrate_triggers(self): + TriggerInstanceDB._meta["allow_inheritance"] = True + + class TriggerInstanceDB_OldFieldType(TriggerInstanceDB): + payload = stormbase.EscapedDictField() + + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(trigger_instance_dbs), 0) + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$type": "object", + }, + } + ) + self.assertEqual(len(trigger_instance_dbs), 0) + + # 1. Insert data in old format + trigger_instance_1_db = TriggerInstanceDB_OldFieldType() + trigger_instance_1_db.payload = MOCK_PAYLOAD_1 + trigger_instance_1_db.status = TRIGGER_INSTANCE_PROCESSED + trigger_instance_1_db.occurrence_time = datetime.datetime.utcnow() + + trigger_instance_1_db = TriggerInstance.add_or_update( + trigger_instance_1_db, publish=False + ) + + trigger_instance_2_db = TriggerInstanceDB_OldFieldType() + trigger_instance_2_db.payload = MOCK_PAYLOAD_2 + trigger_instance_2_db.status = TRIGGER_INSTANCE_PENDING + trigger_instance_2_db.occurrence_time = datetime.datetime.utcnow() + + trigger_instance_2_db = TriggerInstance.add_or_update( + trigger_instance_2_db, publish=False + ) + + # This object is older than the default threshold so it should not be migrated + trigger_instance_3_db = TriggerInstanceDB_OldFieldType() + trigger_instance_3_db.payload = MOCK_PAYLOAD_2 + trigger_instance_3_db.status = TRIGGER_INSTANCE_PROCESSED + trigger_instance_3_db.occurrence_time = datetime.datetime.utcfromtimestamp(0) + + trigger_instance_3_db = TriggerInstance.add_or_update( + trigger_instance_3_db, publish=False + ) + + # Verify data has been inserted in old format + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$not": { + "$type": "binData", + }, + } + } + ) + self.assertEqual(len(trigger_instance_dbs), 3) + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$type": "object", + }, + } + ) + self.assertEqual(len(trigger_instance_dbs), 3) + + # Update inserted documents and remove special _cls field added by mongoengine. We need to + # do that here due to how mongoengine works with subclasses. + TriggerInstance.query( + __raw__={ + "payload": { + "$type": "object", + }, + } + ).update(set___cls="TriggerInstanceDB") + + # 2. Run migration + start_dt = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) - datetime.timedelta(hours=2) + end_dt = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + migration_module.migrate_triggers(start_dt=start_dt, end_dt=end_dt) + + # 3. Verify data has been migrated - only 1 item should have been migrated since it's in a + # completed state + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$not": { + "$type": "binData", + }, + } + } + ) + + # TODO: Also verify raw as_pymongo() bin field value + self.assertEqual(len(trigger_instance_dbs), 2) + trigger_instance_dbs = TriggerInstance.query( + __raw__={ + "payload": { + "$type": "object", + }, + } + ) + self.assertEqual(len(trigger_instance_dbs), 2) + + trigger_instance_1_db_retrieved = TriggerInstance.get_by_id( + trigger_instance_1_db.id + ) + self.assertEqual(trigger_instance_1_db_retrieved.payload, MOCK_PAYLOAD_1) + + trigger_instance_2_db_retrieved = TriggerInstance.get_by_id( + trigger_instance_2_db.id + ) + self.assertEqual(trigger_instance_2_db_retrieved.payload, MOCK_PAYLOAD_2)