diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4e39c802..083132da 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,12 +15,18 @@ jobs: - name: Setup running platform stack run: docker compose up --build -d + - name: Pause for stack to start + run: sleep 10 + - name: Integration tests - API run: docker run --env CONF_DIR=/dp3/tests/test_config --network container:dp3_api dp3_interpreter python -m unittest discover -s tests/test_api -v + - name: Check worker errors + run: docker compose logs worker | grep "WARNING\|ERROR\|exception" | grep -v "RabbitMQ\|it's\ OK\ now,\ we're\ successfully\ connected" || true + - name: Teardown platform stack run: docker compose down diff --git a/config/history_manager.yml b/config/history_manager.yml index 6254e722..2a3c5bd8 100644 --- a/config/history_manager.yml +++ b/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/docs/configuration/history_manager.md b/docs/configuration/history_manager.md index 1760ae23..7c8bcc51 100644 --- a/docs/configuration/history_manager.md +++ b/docs/configuration/history_manager.md @@ -13,25 +13,29 @@ Configuration file `history_manager.yml` is very simple: aggregation_schedule: # (1)! minute: "*/10" -datapoint_cleaning_schedule: # (2)! +mark_datapoints_schedule: # (2)! + hour: "7,19" + minute: "45" +datapoint_cleaning_schedule: # (3)! minute: "*/30" snapshot_cleaning: - schedule: {minute: "15,45"} # (3)! - older_than: 7d # (4)! + schedule: {minute: "15,45"} # (4)! + older_than: 7d # (5)! datapoint_archivation: - schedule: {hour: 2, minute: 0} # (5)! - older_than: 7d # (6)! - archive_dir: "data/datapoints/" # (7)! + schedule: {hour: 2, minute: 0} # (6)! + older_than: 7d # (7)! + archive_dir: "data/datapoints/" # (8)! ``` 1. Parameter `aggregation_schedule` sets the interval for DP³ to aggregate observation datapoints in master records. This should be scheduled more often than cleaning of datapoints. -2. Parameter `datapoint_cleaning_schedule` sets interval when should DP³ check if any data in master record of observations and timeseries attributes isn't too old and if there's something too old, removes it. To control what is considered as "too old", see parameter `max_age` in *Database entities* configuration. -3. Parameter `snapshot_cleaning.schedule` sets the interval for DP³ to clean the snapshots collection. Optimally should be scheduled outside the snapshot creation window. See *Snapshots* configuration for more. -4. Parameter `snapshot_cleaning.older_than` sets how old must a snapshot be to be deleted. -5. Parameter `datapoint_archivation.schedule` sets interval for DP³ to archive datapoints from raw collections. -6. Parameter `datapoint_archivation.older_than` sets how old must a datapoint be to be archived. -7. Parameter `datapoint_archivation.archive_dir` sets directory where should be archived old datapoints. If directory doesn't exist, it will be created, but write priviledges must be set correctly. Can be also set to `null` (or not set) to disable archivation and only delete old data. +2. Parameter `mark_datapoints_schedule` sets the interval when the datapoint timestamps are marked for all entities in a master collection. This should be scheduled very rarely, as it's a very expensive operation. +3. Parameter `datapoint_cleaning_schedule` sets interval when should DP³ check if any data in master record of observations and timeseries attributes isn't too old and if there's something too old, removes it. To control what is considered as "too old", see parameter `max_age` in *Database entities* configuration. +4. Parameter `snapshot_cleaning.schedule` sets the interval for DP³ to clean the snapshots collection. Optimally should be scheduled outside the snapshot creation window. See *Snapshots* configuration for more. +5. Parameter `snapshot_cleaning.older_than` sets how old must a snapshot be to be deleted. +6. Parameter `datapoint_archivation.schedule` sets interval for DP³ to archive datapoints from raw collections. +7. Parameter `datapoint_archivation.older_than` sets how old must a datapoint be to be archived. +8. Parameter `datapoint_archivation.archive_dir` sets directory where should be archived old datapoints. If directory doesn't exist, it will be created, but write priviledges must be set correctly. Can be also set to `null` (or not set) to disable archivation and only delete old data. The schedule dictionaries are transformed to cron expressions, see [CronExpression docs][dp3.common.config.CronExpression] for details. \ No newline at end of file diff --git a/dp3/api/routers/entity.py b/dp3/api/routers/entity.py index 9d33ce88..b427eaf8 100644 --- a/dp3/api/routers/entity.py +++ b/dp3/api/routers/entity.py @@ -61,8 +61,6 @@ def get_eid_snapshots_handler( ): """Handler for getting snapshots of EID""" snapshots = list(DB.get_snapshots(etype, eid, t1=date_from, t2=date_to)) - for s in snapshots: - del s["_id"] return snapshots @@ -136,11 +134,10 @@ async def list_entity_type_eids( time_created = None # Remove _id field - result = list(cursor_page) + result = [r["last"] for r in cursor_page] for r in result: time_created = r["_time_created"] del r["_time_created"] - del r["_id"] return EntityEidList( time_created=time_created, count=len(result), total_count=total_count, data=result diff --git a/dp3/database/database.py b/dp3/database/database.py index 61ffde27..4a0ca07d 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -8,23 +8,26 @@ from datetime import datetime from typing import Any, Callable, Literal, Optional, Union +import bson import pymongo from event_count_logger import DummyEventGroup from pydantic import BaseModel, Field, field_validator from pymongo import ReplaceOne, UpdateMany, UpdateOne, WriteConcern from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor -from pymongo.errors import OperationFailure -from pymongo.results import DeleteResult +from pymongo.errors import BulkWriteError, DocumentTooLarge, OperationFailure, WriteError +from pymongo.results import DeleteResult, UpdateResult from dp3.common.attrspec import AttrType, timeseries_types from dp3.common.config import HierarchicalDict, ModelSpec -from dp3.common.datapoint import DataPointBase +from dp3.common.datapoint import DataPointBase, to_json_friendly from dp3.common.scheduler import Scheduler from dp3.common.types import EventGroupType from dp3.database.schema_cleaner import SchemaCleaner from dp3.task_processing.task_queue import HASH +BSON_OBJECT_TOO_LARGE = 10334 + class DatabaseError(Exception): pass @@ -132,6 +135,9 @@ def __init__( # Init and switch to correct database self._db = self._db[config.db_name] + type_registry = bson.codec_options.TypeRegistry(fallback_encoder=to_json_friendly) + self._codec_opts = bson.codec_options.CodecOptions(type_registry=type_registry) + if process_index == 0: self._init_database_schema(config.db_name) @@ -154,6 +160,9 @@ def __init__( self._sched.register(self._push_raw, second=seconds, misfire_grace_time=5) self._sched.register(self._push_master, second=seconds, misfire_grace_time=5) + self._normal_snapshot_eids = defaultdict(set) + self._oversized_snapshot_eids = defaultdict(set) + self.log.info("Database successfully initialized!") @staticmethod @@ -207,6 +216,11 @@ def _snapshots_col_name(entity: str) -> str: """Returns name of snapshots collection for `entity`.""" return f"{entity}#snapshots" + @staticmethod + def _oversized_snapshots_col_name(entity: str) -> str: + """Returns name of oversized snapshots collection for `entity`.""" + return f"{entity}#snapshots_oversized" + @staticmethod def _raw_col_name(entity: str) -> str: """Returns name of raw data collection for `entity`.""" @@ -232,7 +246,7 @@ def _init_database_schema(self, db_name) -> None: """ for etype in self._db_schema_config.entities: # Snapshots index on `eid` and `_time_created` fields - snapshot_col = self._snapshots_col_name(etype) + snapshot_col = self._oversized_snapshots_col_name(etype) self._db[snapshot_col].create_index("eid", background=True) self._db[snapshot_col].create_index("_time_created", background=True) @@ -271,9 +285,12 @@ def _init_database_schema(self, db_name) -> None: if "$**" in attr_names: if index.get("wildcardProjection") is not None: covered_attrs = { - k.rsplit(".", maxsplit=1)[0] for k in index["wildcardProjection"] + it + for k in index["wildcardProjection"] + for it in k.rsplit(".", maxsplit=1) } - if not attrs - covered_attrs: + covered_attrs -= {"ts_last_update", "#min_t2s"} + if "t2" not in covered_attrs and not attrs - covered_attrs: create_wildcard = False continue # Index already covers all history attributes self.log.info("Dropping wildcard index %s on %s", index["name"], etype) @@ -291,7 +308,7 @@ def _init_database_schema(self, db_name) -> None: master_col.create_index( [("$**", 1)], name=index_name, - wildcardProjection={f"{attr}.t2": 1 for attr in history_attrs} + wildcardProjection={f"#min_t2s.{attr}": 1 for attr in history_attrs} | {f"{attr}.ts_last_update": 1 for attr in plain_attrs}, ) self.log.info("Created wildcard index %s on %s", index_name, etype) @@ -398,7 +415,9 @@ def _push_raw(self): continue try: self._db.get_collection( - self._raw_col_name(etype), write_concern=WriteConcern(w=0) + self._raw_col_name(etype), + write_concern=WriteConcern(w=0), + codec_options=self._codec_opts, ).insert_many(dps, ordered=False) end = time.time() self.log.debug( @@ -414,9 +433,11 @@ def _push_raw(self): def _push_master(self): """Push master changes to database.""" - for etype, lock in self._raw_buffer_locks.items(): + for etype, lock in self._master_buffer_locks.items(): master_col = self._db.get_collection( - self._master_col_name(etype), write_concern=WriteConcern(w=1) + self._master_col_name(etype), + write_concern=WriteConcern(w=1), + codec_options=self._codec_opts, ) begin = time.time() with lock: @@ -522,6 +543,7 @@ def delete_eids(self, etype: str, eids: list[str]): """Delete master record and all snapshots of `etype`:`eids`.""" master_col = self._master_col_name(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) try: res = self._db[master_col].delete_many({"_id": {"$in": eids}}) self.log.debug( @@ -531,8 +553,12 @@ def delete_eids(self, etype: str, eids: list[str]): except Exception as e: raise DatabaseError(f"Delete of master record failed: {e}\n{eids}") from e try: - res = self._db[snapshot_col].delete_many({"eid": {"$in": eids}}) + res = self._db[snapshot_col].delete_many({"_id": {"$in": eids}}) self.log.debug("Deleted %s snapshots of %s (%s).", res.deleted_count, etype, len(eids)) + res = self._db[os_snapshot_col].delete_many({"eid": {"$in": eids}}) + self.log.debug( + "Deleted %s oversized snapshots of %s (%s).", res.deleted_count, etype, len(eids) + ) except Exception as e: raise DatabaseError(f"Delete of snapshots failed: {e}\n{eids}") from e for f in self._on_entity_delete_many: @@ -545,6 +571,7 @@ def delete_eid(self, etype: str, eid: str): """Delete master record and all snapshots of `etype`:`eid`.""" master_col = self._master_col_name(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) try: self._db[master_col].delete_one({"_id": eid}) self.log.debug("Deleted master record of %s/%s.", etype, eid) @@ -552,8 +579,12 @@ def delete_eid(self, etype: str, eid: str): except Exception as e: raise DatabaseError(f"Delete of master record failed: {e}\n{eid}") from e try: - res = self._db[snapshot_col].delete_many({"eid": eid}) + res = self._db[snapshot_col].delete_many({"_id": eid}) self.log.debug("deleted %s snapshots of %s/%s.", res.deleted_count, etype, eid) + res = self._db[os_snapshot_col].delete_many({"eid": eid}) + self.log.debug( + "Deleted %s oversized snapshots of %s/%s.", res.deleted_count, etype, eid + ) except Exception as e: raise DatabaseError(f"Delete of snapshots failed: {e}\n{eid}") from e for f in self._on_entity_delete_one: @@ -562,15 +593,93 @@ def delete_eid(self, etype: str, eid: str): except Exception as e: self.log.exception("Error in on_entity_delete_one callback %s: %s", f, e) - def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> None: - """Delete old datapoints from master collection. + def mark_all_entity_dps_t2(self, etype: str, attrs: list[str]) -> UpdateResult: + """ + Updates the `min_t2s` of the master records of `etype` for all records. Periodically called for all `etype`s from HistoryManager. """ master_col = self._master_col_name(etype) try: - self._db[master_col].update_many( - {f"{attr_name}.t2": {"$lt": t_old}}, {"$pull": {attr_name: {"t2": {"$lt": t_old}}}} + return self._db[master_col].update_many( + {}, + [ + { + "$set": { + attr_name: { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": f"${attr_name}", + } + } + for attr_name in attrs + } + | { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + for attr_name in attrs + } + } + ], + ) + except Exception as e: + raise DatabaseError(f"Update of min_t2s failed: {e}") from e + + def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> UpdateResult: + """Delete old datapoints from master collection. + + Periodically called for all `etype`s from HistoryManager. + """ + master_col = self._db.get_collection( + self._master_col_name(etype), write_concern=WriteConcern(w=1) + ) + try: + return master_col.update_many( + {f"#min_t2s.{attr_name}": {"$lt": t_old}}, + [ + { + "$set": { + attr_name: { + "$filter": { + "input": f"${attr_name}", + "cond": {"$gte": ["$$this.t2", t_old]}, + } + } + } + }, + { + "$set": { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + }, + }, + ], ) except Exception as e: raise DatabaseError(f"Delete of old datapoints failed: {e}") from e @@ -673,7 +782,9 @@ def get_latest_snapshot(self, etype: str, eid: str) -> dict: self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) - return self._db[snapshot_col].find_one({"eid": eid}, sort=[("_id", -1)]) or {} + return ( + self._db[snapshot_col].find_one({"_id": eid}, {"last": 1}, sort=[("_id", -1)]) or {} + ).get("last", {}) def _get_latest_snapshots_date(self) -> Optional[datetime]: """Get date of newest snapshot set. @@ -735,15 +846,6 @@ def get_latest_snapshots( snapshot_col = self._snapshots_col_name(etype) - # Find newest fully completed snapshot date - latest_snapshot_date = self._get_latest_snapshots_date() - - # There are no fully completed snapshots sets - return all currently existing snapshots - if latest_snapshot_date is None: - return self._db[snapshot_col].find().sort([("eid", pymongo.ASCENDING)]), self._db[ - snapshot_col - ].count_documents({}) - if not fulltext_filters: fulltext_filters = {} @@ -752,7 +854,6 @@ def get_latest_snapshots( # Create base of query query = generic_filter - query["_time_created"] = latest_snapshot_date # Process fulltext filters for attr in fulltext_filters: @@ -760,7 +861,7 @@ def get_latest_snapshots( # EID filter if attr == "eid": - query[attr] = fulltext_filter + query["_id"] = fulltext_filter continue # Check if attribute exists @@ -771,20 +872,20 @@ def get_latest_snapshots( # Correctly handle link<...> data type if attr_spec.t in AttrType.PLAIN | AttrType.OBSERVATIONS and attr_spec.is_relation: - query[attr + ".eid"] = fulltext_filter + query["last." + attr + ".eid"] = fulltext_filter else: - query[attr] = fulltext_filter + query["last." + attr] = fulltext_filter try: - return self._db[snapshot_col].find(query).sort([("eid", pymongo.ASCENDING)]), self._db[ - snapshot_col - ].count_documents(query) + return self._db[snapshot_col].find(query, {"last": 1}).sort( + [("_id", pymongo.ASCENDING)] + ), self._db[snapshot_col].count_documents(query) except OperationFailure as e: raise DatabaseError("Invalid query") from e def get_snapshots( self, etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None - ) -> Cursor: + ) -> Union[Cursor, CommandCursor]: """Get all (or filtered) snapshots of given `eid`. This method is useful for displaying `eid`'s history on web. @@ -799,6 +900,37 @@ def get_snapshots( self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) + + # Find out if the snapshot is oversized + doc = self._db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) + if doc and doc.get("oversized", False): + return self._get_snapshots_oversized(etype, eid, t1, t2) + + query = {"_time_created": {}} + pipeline = [ + {"$match": {"_id": eid}}, + {"$set": {"history": {"$concatArrays": [["$last"], "$history"]}}}, + {"$unwind": "$history"}, + {"$replaceRoot": {"newRoot": "$history"}}, + ] + + # Filter by date + if t1: + query["_time_created"]["$gte"] = t1 + if t2: + query["_time_created"]["$lte"] = t2 + + # Unset if empty + if query["_time_created"]: + pipeline.append({"$match": query}) + pipeline.append({"$sort": {"_time_created": pymongo.ASCENDING}}) + return self._db[snapshot_col].aggregate(pipeline) + + def _get_snapshots_oversized( + self, etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None + ) -> Cursor: + """Get all (or filtered) snapshots of given `eid` from oversized snapshots collection.""" + snapshot_col = self._oversized_snapshots_col_name(etype) query = {"eid": eid, "_time_created": {}} # Filter by date @@ -861,25 +993,129 @@ def estimate_count_eids(self, etype: str) -> int: master_col = self._master_col_name(etype) return self._db[master_col].estimated_document_count({}) + def _migrate_to_oversized_snapshot(self, etype: str, eid: str, snapshot: dict): + snapshot_col = self._snapshots_col_name(etype) + os_col = self._oversized_snapshots_col_name(etype) + + try: + doc = self._db[snapshot_col].find_one_and_update( + {"_id": eid}, + { + "$set": {"oversized": True, "last": snapshot, "count": 0}, + "$unset": {"history": ""}, + }, + ) + inserts = list(doc.get("history", [])) + inserts.insert(0, doc.get("last", {})) + inserts.insert(0, snapshot) + self._db[os_col].insert_many(inserts) + except Exception as e: + raise DatabaseError(f"Update of snapshot {eid} failed: {e}, {snapshot}") from e + def save_snapshot(self, etype: str, snapshot: dict, time: datetime): - """Saves snapshot to specified entity of current master document.""" + """Saves snapshot to specified entity of current master document. + + Will move snapshot to oversized snapshots if the maintained bucket is too large. + """ # Check `etype` self._assert_etype_exists(etype) snapshot["_time_created"] = time + if "eid" not in snapshot: + self.log.error("Snapshot is missing 'eid' field: %s", snapshot) + return + eid = snapshot["eid"] snapshot_col = self._snapshots_col_name(etype) + os_col = self._oversized_snapshots_col_name(etype) + + # Find out if the snapshot is oversized + normal, oversized, new = self._get_snapshot_state(etype, {eid}) + if new: + # First snapshot of entity + self._db[snapshot_col].insert_one( + {"_id": eid, "last": snapshot, "history": [], "oversized": False, "count": 0} + ) + self.log.debug(f"Inserted snapshot of {eid}") + self._cache_snapshot_state(etype, new, oversized) + return + elif oversized: + # Snapshot is already marked as oversized + self._db[snapshot_col].update_one({"_id": eid}, {"$set": {"last": snapshot}}) + self._db[os_col].insert_one(snapshot) + return + try: - self._db[snapshot_col].insert_one(snapshot) - self.log.debug(f"Inserted snapshot: {snapshot}") + # Update a normal snapshot bucket + res = self._db[snapshot_col].update_one( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + ["$last"], + "$history", + ] + }, + "count": {"$sum": ["$count", 1]}, + } + }, + {"$set": {"last": {"$literal": snapshot}}}, + ], + ) + if res.modified_count == 0: + self.log.error(f"Snapshot of {eid} was not updated, {res.raw_result}") + except (WriteError, OperationFailure) as e: + if e.code != BSON_OBJECT_TOO_LARGE: + raise e + # The snapshot is too large, move it to oversized snapshots + self.log.info(f"Snapshot of {eid} is too large: {e}, marking as oversized.") + self._migrate_to_oversized_snapshot(etype, eid, snapshot) + self._cache_snapshot_state(etype, set(), normal) except Exception as e: - raise DatabaseError(f"Insert of snapshot failed: {e}\n{snapshot}") from e + raise DatabaseError(f"Insert of snapshot {eid} failed: {e}, {snapshot}") from e + + def _get_snapshot_state(self, etype: str, eids: set[str]) -> tuple[set, set, set]: + """Get current state of snapshot of given `eid`.""" + unknown = eids + normal = self._normal_snapshot_eids[etype] & unknown + oversized = self._oversized_snapshot_eids[etype] & unknown + unknown = unknown - normal - oversized + + if not unknown: + return normal, oversized, unknown + + snapshot_col = self._snapshots_col_name(etype) + new_normal = set() + new_oversized = set() + for doc in self._db[snapshot_col].find({"_id": {"$in": list(unknown)}}, {"oversized": 1}): + if doc.get("oversized", False): + new_oversized.add(doc["_id"]) + else: + new_normal.add(doc["_id"]) + + self._normal_snapshot_eids[etype] |= new_normal + self._oversized_snapshot_eids[etype] |= new_oversized + unknown = unknown - new_normal - new_oversized + + return normal | new_normal, oversized | new_oversized, unknown + + def _cache_snapshot_state(self, etype: str, normal: set, oversized: set): + """Cache snapshot state for given `etype`.""" + self._normal_snapshot_eids[etype] |= normal + + self._normal_snapshot_eids[etype] -= oversized + self._oversized_snapshot_eids[etype] |= oversized def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): """ Saves a list of snapshots of current master documents. All snapshots must belong to same entity type. + + Will move snapshots to oversized snapshots if the maintained bucket is too large. + For better understanding, see `save_snapshot()`. """ # Check `etype` self._assert_etype_exists(etype) @@ -888,18 +1124,158 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): snapshot["_time_created"] = time snapshot_col = self._snapshots_col_name(etype) - try: - res = self._db[snapshot_col].insert_many(snapshots, ordered=False) - if len(res.inserted_ids) != len(snapshots): - self.log.error( - "Inserted only %s snapshots when trying to insert %s", - len(res.inserted_ids), - len(snapshots), + os_col = self._oversized_snapshots_col_name(etype) + + snapshots_by_eid = defaultdict(list) + for snapshot in snapshots: + if "eid" not in snapshot: + continue + if "_id" in snapshot: + del snapshot["_id"] + snapshots_by_eid[snapshot["eid"]].append(snapshot) + + # Find out if any of the snapshots are oversized + normal, oversized, new = self._get_snapshot_state(etype, set(snapshots_by_eid.keys())) + + inserts = [] + updates = [] + update_originals = [] + oversized_inserts = [] + oversized_updates = [] + + # A normal snapshot, shift the last snapshot to history and update last + for eid in normal: + updates.append( + UpdateOne( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + snapshots_by_eid[eid][:-1], + ["$last"], + "$history", + ] + }, + "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, + } + }, + {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, + ], ) - else: - self.log.debug(f"Inserted snapshots: {snapshots}") - except Exception as e: - raise DatabaseError(f"Insert of snapshots failed: {e}\n{snapshots}") from e + ) + update_originals.append(snapshots_by_eid[eid]) + + # Snapshot is already marked as oversized + for eid in oversized: + oversized_inserts.extend(snapshots_by_eid[eid]) + oversized_updates.append( + UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + ) + + # The remaining snapshots are new + for eid in new: + eid_snapshots = snapshots_by_eid[eid] + inserts.append( + { + "_id": eid, + "last": eid_snapshots[-1], + "history": eid_snapshots[:-1], + "oversized": False, + "count": len(eid_snapshots) - 1, + } + ) + + new_oversized = set() + new_normal = set() + + if updates: + try: + res = self._db[snapshot_col].bulk_write(updates, ordered=False) + if res.modified_count != len(updates): + self.log.warning( + "Some snapshots were not updated, %s != %s", + res.modified_count, + len(updates), + ) + except (BulkWriteError, OperationFailure) as e: + self.log.info("Update of snapshots failed, will retry with oversize.") + failed_indexes = [ + err["index"] + for err in e.details["writeErrors"] + if err["code"] == BSON_OBJECT_TOO_LARGE + ] + failed_snapshots = (update_originals[i] for i in failed_indexes) + for eid_snapshots in failed_snapshots: + eid = eid_snapshots[0]["eid"] + failed_snapshots = sorted( + eid_snapshots, key=lambda s: s["_time_created"], reverse=True + ) + self._migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0]) + oversized_inserts.extend(failed_snapshots[1:]) + new_oversized.add(eid) + + if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]): + # Some other error occurred + raise e + except Exception as e: + raise DatabaseError(f"Update of snapshots failed: {str(e)[:2048]}") from e + + if inserts: + try: + # Insert new snapshots + res = self._db[snapshot_col].insert_many(inserts, ordered=False) + new_normal.update(res.inserted_ids) + if len(res.inserted_ids) != len(inserts): + self.log.warning( + "Some snapshots were not inserted, %s != %s, failed: %s", + len(res.inserted_ids), + len(snapshots_by_eid), + {s["_id"] for s in inserts} - set(res.inserted_ids), + ) + except (DocumentTooLarge, OperationFailure) as e: + self.log.info(f"Inserted snapshot is too large, will retry with oversize. {e}") + checked_inserts = [] + oversized_inserts = [] + + # Filter out the oversized snapshots + for insert_doc in inserts: + bsize = len(bson.BSON.encode(insert_doc)) + if bsize < 16 * 1024 * 1024: + checked_inserts.append(insert_doc) + else: + eid = insert_doc["_id"] + checked_inserts.append( + { + "_id": eid, + "last": insert_doc["last"], + "oversized": True, + "history": [], + "count": 0, + } + ) + oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]]) + new_oversized.add(eid) + try: + res = self._db[snapshot_col].insert_many(checked_inserts, ordered=False) + new_normal.update(res.inserted_ids) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {e}") from e + except Exception as e: + raise DatabaseError(f"Insert of snapshot failed: {e}") from e + + # Update the oversized snapshots + if oversized_inserts: + try: + if oversized_updates: + self._db[snapshot_col].bulk_write(oversized_updates) + self._db[os_col].insert_many(oversized_inserts) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e + + # Cache the new state + self._cache_snapshot_state(etype, new_normal, new_oversized) def _get_metadata_id(self, module: str, time: datetime, worker_id: Optional[int] = None) -> str: """Generates unique metadata id based on `module`, `time` and the worker index.""" @@ -1098,6 +1474,7 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]: self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) # Get attribute specification try: @@ -1120,37 +1497,48 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]: ) # Build aggregation query - agg_query = [ - {"$match": {"_time_created": latest_snapshot_date}}, - ] + attr_path = "$last." + attr + os_attr_path = "$" + attr + unwinding = [] + os_unwinding = [] # Unwind array-like and multi value attributes # If attribute is multi value array, unwind twice if "array" in attr_spec.data_type.root or "set" in attr_spec.data_type.root: - agg_query.append({"$unwind": "$" + attr}) + unwinding.append({"$unwind": attr_path}) + os_unwinding.append({"$unwind": os_attr_path}) if attr_spec.t == AttrType.OBSERVATIONS and attr_spec.multi_value: - agg_query.append({"$unwind": "$" + attr}) + unwinding.append({"$unwind": attr_path}) + os_unwinding.append({"$unwind": os_attr_path}) # Group - agg_query_group_id = "$" + attr + agg_query_group_id = attr_path + os_agg_query_group_id = os_attr_path if "link" in attr_spec.data_type.root: agg_query_group_id += ".eid" - agg_query.append( - { - "$group": { - "_id": agg_query_group_id, - "count": {"$sum": 1}, - } - } - ) - - # Sort - agg_query.append({"$sort": {"_id": 1, "count": -1}}) + os_agg_query_group_id += ".eid" + agg_query = [ + *unwinding, + {"$group": {"_id": agg_query_group_id, "count": {"$sum": 1}}}, + {"$sort": {"_id": 1, "count": -1}}, + ] # Run aggregation distinct_counts_cur = self._db[snapshot_col].aggregate(agg_query) distinct_counts = {x["_id"]: x["count"] for x in distinct_counts_cur} + + # Run aggregation for oversized snapshots and sum the counts + agg_query = [ + {"$match": {"_time_created": latest_snapshot_date}}, + *os_unwinding, + {"$group": {"_id": os_agg_query_group_id, "count": {"$sum": 1}}}, + {"$sort": {"_id": 1, "count": -1}}, + ] + os_distinct_counts_cur = self._db[os_snapshot_col].aggregate(agg_query) + for x in os_distinct_counts_cur: + distinct_counts[x["_id"]] = distinct_counts.get(x["_id"], 0) + x["count"] + if None in distinct_counts: del distinct_counts[None] @@ -1240,16 +1628,37 @@ def drop_empty_archives(self, etype: str) -> int: raise DatabaseError(f"Drop of empty archive failed: {e}") from e return dropped_count - def delete_old_snapshots(self, etype: str, t_old: datetime): + def delete_old_snapshots(self, etype: str, t_old: datetime, n_old: int) -> int: """Delete old snapshots. Periodically called for all `etype`s from HistoryManager. """ snapshot_col_name = self._snapshots_col_name(etype) + os_snapshot_col_name = self._oversized_snapshots_col_name(etype) + deleted = 0 try: - return self._db[snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}}) + res = self._db[snapshot_col_name].update_many( + {"count": {"$gte": n_old}}, + [ + { + "$set": { + "history": { + "$filter": { + "input": "$history", + "cond": {"$gte": ["$$this._time_created", t_old]}, + } + } + } + }, + {"$set": {"count": {"$size": "$history"}}}, + ], + ) + deleted += res.modified_count + res = self._db[os_snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}}) + deleted += res.deleted_count except Exception as e: raise DatabaseError(f"Delete of olds snapshots failed: {e}") from e + return deleted def get_module_cache(self, override_called_id: Optional[str] = None): """Return a persistent cache collection for given module name. diff --git a/dp3/history_management/history_manager.py b/dp3/history_management/history_manager.py index eff9eff2..126eb884 100644 --- a/dp3/history_management/history_manager.py +++ b/dp3/history_management/history_manager.py @@ -65,12 +65,14 @@ class HistoryManagerConfig(BaseModel, extra=Extra.forbid): Attributes: aggregation_schedule: Schedule for master document aggregation. datapoint_cleaning_schedule: Schedule for datapoint cleaning. + mark_datapoints_schedule: Schedule for marking datapoints in master docs. snapshot_cleaning: Configuration for snapshot cleaning. datapoint_archivation: Configuration for datapoint archivation. """ aggregation_schedule: CronExpression datapoint_cleaning_schedule: CronExpression + mark_datapoints_schedule: CronExpression snapshot_cleaning: SnapshotCleaningConfig datapoint_archivation: DPArchivationConfig @@ -101,6 +103,11 @@ def __init__( return # Schedule datapoints cleaning + datapoint_marking_schedule = self.config.mark_datapoints_schedule + registrar.scheduler_register( + self.mark_datapoints_in_master_docs, **datapoint_marking_schedule.model_dump() + ) + datapoint_cleaning_schedule = self.config.datapoint_cleaning_schedule registrar.scheduler_register( self.delete_old_dps, **datapoint_cleaning_schedule.model_dump() @@ -111,6 +118,8 @@ def __init__( registrar.scheduler_register( self.delete_old_snapshots, **snapshot_cleaning_schedule.model_dump() ) + self.keep_snapshot_count = self._get_snapshot_count_to_keep(platform_config.config) + self.log.info("Keeping %s snapshots", self.keep_snapshot_count) # Schedule datapoint archivation archive_config = self.config.datapoint_archivation @@ -121,6 +130,45 @@ def __init__( self.log_dir = None registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.model_dump()) + def _get_snapshot_count_to_keep(self, config) -> int: + """Returns how many snapshots should be kept based on configuration. + + This depends on the frequency of snapshot creation and the max snapshot age. + """ + max_age_days = self.keep_snapshot_delta.total_seconds() / 3600 / 24 + creation_rate = config.get("snapshots.creation_rate", {"minute": "*/30"}) + if len(creation_rate) > 1: + raise ValueError("Only one snapshot creation rate is supported.") + + for key, value in creation_rate.items(): + if value.startswith("*/"): + snapshot_interval = int(value[2:]) + if key == "second": + snapshots_per_day = 60 * 60 * 24 / snapshot_interval + elif key == "minute": + snapshots_per_day = 60 * 24 / snapshot_interval + elif key == "hour": + snapshots_per_day = 24 / snapshot_interval + elif key == "day": + snapshots_per_day = 1 / snapshot_interval + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + else: + count = len(value.split(",")) + if key == "second": + snapshots_per_day = 60 * 24 * count + elif key == "minute": + snapshots_per_day = 24 * count + elif key == "hour": + snapshots_per_day = count + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + break + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + + return int(max_age_days * snapshots_per_day) + def delete_old_dps(self): """Deletes old data points from master collection.""" self.log.debug("Deleting old records ...") @@ -144,16 +192,35 @@ def delete_old_dps(self): except DatabaseError as e: self.log.error(e) + def mark_datapoints_in_master_docs(self): + """Marks the timestamps of all datapoints in master documents.""" + self.log.debug("Marking the datapoint timestamps for all entity records ...") + + for entity, attr_conf in self.model_spec.entity_attributes.items(): + attrs_to_mark = [] + for attr, conf in attr_conf.items(): + if conf.t in AttrType.OBSERVATIONS | AttrType.TIMESERIES: + attrs_to_mark.append(attr) + + if not attrs_to_mark: + continue + try: + res = self.db.mark_all_entity_dps_t2(entity, attrs_to_mark) + self.log.debug("Marked %s records of %s", res.modified_count, entity) + except DatabaseError as e: + self.log.error(e) + def delete_old_snapshots(self): """Deletes old snapshots.""" t_old = datetime.now() - self.keep_snapshot_delta - self.log.debug("Deleting all snapshots before %s", t_old) + n_old = self.keep_snapshot_count + self.log.debug("Deleting all snapshots before %s and over %s in total", t_old, n_old) deleted_total = 0 for etype in self.model_spec.entities: try: - result = self.db.delete_old_snapshots(etype, t_old) - deleted_total += result.deleted_count + result = self.db.delete_old_snapshots(etype, t_old, n_old) + deleted_total += result except DatabaseError as e: self.log.exception(e) self.log.debug("Deleted %s snapshots in total.", deleted_total) diff --git a/dp3/scripts/add_min_t2s.py b/dp3/scripts/add_min_t2s.py new file mode 100755 index 00000000..81606c25 --- /dev/null +++ b/dp3/scripts/add_min_t2s.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""Simple script to add min_t2s to master records to allow for better indexing. + +Intended to be run only once to upgrade a database existing before 08-2024. +""" + +import argparse +import time + +from dp3.common.attrspec import AttrType +from dp3.common.config import ModelSpec, read_config_dir +from dp3.database.database import EntityDatabase, MongoConfig + +# Arguments parser +parser = argparse.ArgumentParser( + description="Add min_t2s to master records to allow for better indexing." +) +parser.add_argument( + "--config", + default="/etc/adict/config", + help="DP3 config directory (default: /etc/adict/config)", +) +args = parser.parse_args() + +# Load DP3 configuration +config = read_config_dir(args.config, recursive=True) +model_spec = ModelSpec(config.get("db_entities")) + +# Connect to database +connection_conf = MongoConfig.model_validate(config.get("database", {})) +client = EntityDatabase.connect(connection_conf) +client.admin.command("ping") + +db = client[connection_conf.db_name] + + +for entity, attributes in model_spec.entity_attributes.items(): + t1 = time.time() + res = db[f"{entity}#master"].update_many( + {}, + [ + { + "$set": { + attr_name: { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": f"${attr_name}", + } + } + for attr_name, spec in attributes.items() + if spec.t != AttrType.PLAIN + } + | { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + for attr_name, spec in attributes.items() + if spec.t != AttrType.PLAIN + } + }, + ], + ) + t2 = time.time() + print(f"Updated {res.modified_count} records for entity {entity} in {t2 - t1:.2f}s") diff --git a/dp3/scripts/migrate_snapshots.py b/dp3/scripts/migrate_snapshots.py new file mode 100755 index 00000000..99d48868 --- /dev/null +++ b/dp3/scripts/migrate_snapshots.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +"""Simple script to migrate snapshots schema from flat single-snapshot documents to +a nested history-last schema. + +Intended to be run only once to upgrade a database existing before 08-2024. +""" + +import argparse +from collections import defaultdict + +import bson +from pymongo import UpdateOne +from pymongo.errors import BulkWriteError, DocumentTooLarge, OperationFailure, WriteError + +from dp3.common.config import ModelSpec, read_config_dir +from dp3.database.database import DatabaseError, EntityDatabase, MongoConfig + +# Arguments parser +parser = argparse.ArgumentParser( + description="Migrate snapshots schema from flat single-snapshot documents " + "to a nested history schema." +) +parser.add_argument( + "--config", + default="/etc/adict/config", + help="DP3 config directory (default: /etc/adict/config)", +) +parser.add_argument("--dry-run", action="store_true", help="Do not write to database") +parser.add_argument("-n", default=100, type=int, help="Number of updates to send per request") +args = parser.parse_args() + +# Load DP3 configuration +config = read_config_dir(args.config, recursive=True) +model_spec = ModelSpec(config.get("db_entities")) + +# Connect to database +connection_conf = MongoConfig.model_validate(config.get("database", {})) +client = EntityDatabase.connect(connection_conf) +print(client) +client.admin.command("ping") + +db = client[connection_conf.db_name] + +BSON_OBJECT_TOO_LARGE = 10334 + + +def _migrate_to_oversized_snapshot(etype: str, eid: str, snapshot: dict): + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + if "_id" in snapshot: + print(f"Removing _id {snapshot['_id']} from snapshot of {eid}") + del snapshot["_id"] + + try: + doc = db[snapshot_col].find_one_and_update( + {"_id": eid}, + { + "$set": {"oversized": True, "last": snapshot, "count": 0}, + "$unset": {"history": ""}, + }, + ) + inserts = list(doc.get("history", [])) + inserts.insert(0, doc.get("last", {})) + inserts.insert(0, snapshot) + db[os_col].insert_many(inserts) + except Exception as e: + raise DatabaseError(f"Update of snapshot {eid} failed: {e}, {snapshot}") from e + + +def save_snapshot(etype: str, snapshot: dict): + """Saves snapshot to specified entity of current master document. + + Will move snapshot to oversized snapshots if the maintained bucket is too large. + """ + if "eid" not in snapshot: + return + eid = snapshot["eid"] + + if "_id" in snapshot: + print(f"Removing _id {snapshot['_id']} from snapshot of {eid}") + del snapshot["_id"] + + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + # Find out if the snapshot is oversized + doc = db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) + if doc is None: + # First snapshot of entity + db[snapshot_col].insert_one( + {"_id": eid, "last": snapshot, "history": [], "oversized": False, "count": 0} + ) + print(f"Inserted snapshot of {eid}") + return + elif doc.get("oversized", False): + # Snapshot is already marked as oversized + db[snapshot_col].update_one({"_id": eid}, {"$set": {"last": snapshot}}) + db[os_col].insert_one(snapshot) + return + + try: + # Update a normal snapshot bucket + res = db[snapshot_col].update_one( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + ["$last"], + "$history", + ] + }, + "count": {"$sum": ["$count", 1]}, + } + }, + {"$set": {"last": {"$literal": snapshot}}}, + ], + ) + if res.modified_count == 0: + print(f"Snapshot of {eid} was not updated, {res.raw_result}") + except (WriteError, OperationFailure) as e: + if e.code != BSON_OBJECT_TOO_LARGE: + raise e + # The snapshot is too large, move it to oversized snapshots + print(f"Snapshot of {eid} is too large: {e}, marking as oversized.") + _migrate_to_oversized_snapshot(etype, eid, snapshot) + except Exception as e: + raise DatabaseError(f"Insert of snapshot {eid} failed: {e}, {snapshot}") from e + + +def save_snapshots(etype: str, snapshots: list[dict]): + """ + Saves a list of snapshots of current master documents. + + All snapshots must belong to same entity type. + + Will move snapshots to oversized snapshots if the maintained bucket is too large. + For better understanding, see `save_snapshot()`. + + """ + snapshots_by_eid = defaultdict(list) + for snapshot in snapshots: + if "eid" not in snapshot: + continue + if "_id" in snapshot: + del snapshot["_id"] + snapshots_by_eid[snapshot["eid"]].append(snapshot) + print(f"Saving {len(snapshots)} snapshots of {len(snapshots_by_eid)} entities of {etype}") + + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + # Find out if any of the snapshots are oversized + docs = list( + db[snapshot_col].find( + {"_id": {"$in": list(snapshots_by_eid.keys())}}, {"oversized": 1, "eid": 1} + ) + ) + + updates = [] + update_originals = [] + oversized_inserts = [] + oversized_updates = [] + + for doc in docs: + eid = doc["_id"] + if not doc.get("oversized", False): + # A normal snapshot, shift the last snapshot to history and update last + updates.append( + UpdateOne( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + snapshots_by_eid[eid][:-1], + ["$last"], + "$history", + ] + }, + "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, + } + }, + {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, + ], + ) + ) + update_originals.append(snapshots_by_eid[eid]) + else: + # Snapshot is already marked as oversized + oversized_inserts.extend(snapshots_by_eid[eid]) + oversized_updates.append( + UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + ) + del snapshots_by_eid[eid] + + # The remaining snapshots are new + inserts = [ + { + "_id": eid, + "last": eid_snapshots[-1], + "history": eid_snapshots[:-1], + "oversized": False, + "count": len(eid_snapshots) - 1, + } + for eid, eid_snapshots in snapshots_by_eid.items() + ] + + if updates: + try: + res = db[snapshot_col].bulk_write(updates, ordered=False) + if res.modified_count != len(updates): + print( + f"Some snapshots were not updated, " + f"{res.modified_count} != {len(snapshots_by_eid)}" + ) + except (BulkWriteError, OperationFailure) as e: + print("Update of snapshots failed, will retry with oversize.") + failed_indexes = [ + err["index"] + for err in e.details["writeErrors"] + if err["code"] == BSON_OBJECT_TOO_LARGE + ] + failed_snapshots = (update_originals[i] for i in failed_indexes) + for eid_snapshots in failed_snapshots: + eid = eid_snapshots[0]["eid"] + failed_snapshots = sorted( + eid_snapshots, key=lambda s: s["_time_created"], reverse=True + ) + _migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0]) + oversized_inserts.extend(failed_snapshots[1:]) + + if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]): + # Some other error occurred + raise e + except Exception as e: + raise DatabaseError(f"Update of snapshots failed: {str(e)[:2048]}") from e + + if inserts: + try: + # Insert new snapshots + res = db[snapshot_col].insert_many(inserts, ordered=False) + if len(res.inserted_ids) != len(snapshots_by_eid): + print( + f"Some snapshots were not inserted, " + f"{len(res.inserted_ids)} != {len(snapshots_by_eid)}" + ) + except (DocumentTooLarge, OperationFailure) as e: + print(f"Snapshot too large: {e}") + checked_inserts = [] + oversized_inserts = [] + + # Filter out the oversized snapshots + for insert_doc in inserts: + bsize = len(bson.BSON.encode(insert_doc)) + if bsize < 16 * 1024 * 1024: + checked_inserts.append(insert_doc) + else: + eid = insert_doc["_id"] + checked_inserts.append( + { + "_id": eid, + "last": insert_doc["last"], + "oversized": True, + "history": [], + "count": 0, + } + ) + oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]]) + try: + db[snapshot_col].insert_many(checked_inserts, ordered=False) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {e}") from e + except Exception as e: + raise DatabaseError(f"Insert of snapshot failed: {e}") from e + + # Update the oversized snapshots + if oversized_inserts: + try: + if oversized_updates: + db[snapshot_col].bulk_write(oversized_updates) + db[os_col].insert_many(oversized_inserts) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e + + +for entity in model_spec.entities: + if len(list(db.list_collections(filter={"name": f"{entity}#snapshots_old"}))) == 0: + db[f"{entity}#snapshots"].rename(f"{entity}#snapshots_old") + print(entity) + + snapshots = [] + for record in db[f"{entity}#snapshots_old"].find({}, sort=[("_time_created", 1)]): + del record["_id"] + snapshots.append(record) + if len(snapshots) >= args.n: + if not args.dry_run: + save_snapshots(entity, snapshots) + else: + print(f"Would save {len(snapshots)} snapshots") + snapshots.clear() + if snapshots and not args.dry_run: + save_snapshots(entity, snapshots) + elif snapshots: + print(f"Would save {len(snapshots)} snapshots") diff --git a/dp3/template/app/config/history_manager.yml b/dp3/template/app/config/history_manager.yml index 6254e722..2a3c5bd8 100644 --- a/dp3/template/app/config/history_manager.yml +++ b/dp3/template/app/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/tests/test_api/test_get_attr_value.py b/tests/test_api/test_get_attr_value.py index 2dd8ba49..971af5d8 100644 --- a/tests/test_api/test_get_attr_value.py +++ b/tests/test_api/test_get_attr_value.py @@ -1,3 +1,5 @@ +from time import sleep + import common from common import ACCEPTED_ERROR_CODES @@ -13,6 +15,11 @@ class GetEidAttrValue(common.APITest): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + sleep(10) + def test_unknown_entity_type(self): response = self.get_request( TESTED_PATH.format(entity="xyz", eid="test_entity_id", attr="test_attr_int") diff --git a/tests/test_api/test_get_distinct_attr_values.py b/tests/test_api/test_get_distinct_attr_values.py index 6e0cee87..916134de 100644 --- a/tests/test_api/test_get_distinct_attr_values.py +++ b/tests/test_api/test_get_distinct_attr_values.py @@ -34,10 +34,11 @@ def setUpClass(cls) -> None: ] ) print(res.content.decode("utf-8"), file=sys.stderr) + sleep(9) # Make snapshots cls.get_request("control/make_snapshots") - sleep(3) + sleep(6) def test_unknown_entity_type(self): response = self.get_request(TESTED_PATH.format(entity="xyz", attr="test_attr_int")) diff --git a/tests/test_api/test_get_entity_eids.py b/tests/test_api/test_get_entity_eids.py index e4d70483..a1cadf0e 100644 --- a/tests/test_api/test_get_entity_eids.py +++ b/tests/test_api/test_get_entity_eids.py @@ -17,8 +17,9 @@ def setUpClass(cls) -> None: [{**dp_base, "id": f"A{i}", "v": f"v{i}"} for i in range(i, i + 20)] ) print(res.content.decode("utf-8"), file=sys.stderr) + sleep(8) cls.get_request("control/make_snapshots") - sleep(3) + sleep(6) def test_get_entity_eids(self): eids = self.get_entity_data("entity/A", EntityEidList) @@ -30,7 +31,7 @@ def test_get_entity_eids_pagination(self): for i in range(0, 100, 10): eids = self.get_entity_data("entity/A", EntityEidList, skip=i, limit=10) - self.assertEqual(10, len(eids.data)) + self.assertEqual(10, len(eids.data), f"Failed at {i}") received_eids.update(x["eid"] for x in eids.data) eids = self.get_entity_data("entity/A", EntityEidList, skip=101, limit=20) @@ -39,7 +40,7 @@ def test_get_entity_eids_pagination(self): def test_get_entity_eids_generic_filter(self): eids = self.get_entity_data( - "entity/A", EntityEidList, generic_filter=json.dumps({"eid": "A0"}) + "entity/A", EntityEidList, generic_filter=json.dumps({"_id": "A0"}) ) self.assertEqual(1, len(eids.data)) self.assertEqual("A0", eids.data[0]["eid"]) diff --git a/tests/test_config/history_manager.yml b/tests/test_config/history_manager.yml index c83c141b..55dd0843 100644 --- a/tests/test_config/history_manager.yml +++ b/tests/test_config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/tests/test_example/config/history_manager.yml b/tests/test_example/config/history_manager.yml index c83c141b..55dd0843 100644 --- a/tests/test_example/config/history_manager.yml +++ b/tests/test_example/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30"