From 5fd9bf3a04b34cda46f79447a90f8688ead97f02 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Fri, 26 Jul 2024 15:53:59 +0200 Subject: [PATCH 01/17] Database: Reworked schema of snapshot collection. --- dp3/api/routers/entity.py | 2 - dp3/database/database.py | 372 ++++++++++++++++++---- dp3/history_management/history_manager.py | 48 ++- 3 files changed, 363 insertions(+), 59 deletions(-) diff --git a/dp3/api/routers/entity.py b/dp3/api/routers/entity.py index 9d33ce88..e77ff908 100644 --- a/dp3/api/routers/entity.py +++ b/dp3/api/routers/entity.py @@ -61,8 +61,6 @@ def get_eid_snapshots_handler( ): """Handler for getting snapshots of EID""" snapshots = list(DB.get_snapshots(etype, eid, t1=date_from, t2=date_to)) - for s in snapshots: - del s["_id"] return snapshots diff --git a/dp3/database/database.py b/dp3/database/database.py index 61ffde27..02f17320 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -8,13 +8,14 @@ from datetime import datetime from typing import Any, Callable, Literal, Optional, Union +import bson import pymongo from event_count_logger import DummyEventGroup from pydantic import BaseModel, Field, field_validator from pymongo import ReplaceOne, UpdateMany, UpdateOne, WriteConcern from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor -from pymongo.errors import OperationFailure +from pymongo.errors import BulkWriteError, DocumentTooLarge, OperationFailure, WriteError from pymongo.results import DeleteResult from dp3.common.attrspec import AttrType, timeseries_types @@ -25,6 +26,8 @@ from dp3.database.schema_cleaner import SchemaCleaner from dp3.task_processing.task_queue import HASH +BSON_OBJECT_TOO_LARGE = 10334 + class DatabaseError(Exception): pass @@ -207,6 +210,11 @@ def _snapshots_col_name(entity: str) -> str: """Returns name of snapshots collection for `entity`.""" return f"{entity}#snapshots" + @staticmethod + def _oversized_snapshots_col_name(entity: str) -> str: + """Returns name of oversized snapshots collection for `entity`.""" + return f"{entity}#snapshots_oversized" + @staticmethod def _raw_col_name(entity: str) -> str: """Returns name of raw data collection for `entity`.""" @@ -522,6 +530,7 @@ def delete_eids(self, etype: str, eids: list[str]): """Delete master record and all snapshots of `etype`:`eids`.""" master_col = self._master_col_name(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) try: res = self._db[master_col].delete_many({"_id": {"$in": eids}}) self.log.debug( @@ -531,8 +540,12 @@ def delete_eids(self, etype: str, eids: list[str]): except Exception as e: raise DatabaseError(f"Delete of master record failed: {e}\n{eids}") from e try: - res = self._db[snapshot_col].delete_many({"eid": {"$in": eids}}) + res = self._db[snapshot_col].delete_many({"_id": {"$in": eids}}) self.log.debug("Deleted %s snapshots of %s (%s).", res.deleted_count, etype, len(eids)) + res = self._db[os_snapshot_col].delete_many({"eid": {"$in": eids}}) + self.log.debug( + "Deleted %s oversized snapshots of %s (%s).", res.deleted_count, etype, len(eids) + ) except Exception as e: raise DatabaseError(f"Delete of snapshots failed: {e}\n{eids}") from e for f in self._on_entity_delete_many: @@ -545,6 +558,7 @@ def delete_eid(self, etype: str, eid: str): """Delete master record and all snapshots of `etype`:`eid`.""" master_col = self._master_col_name(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) try: self._db[master_col].delete_one({"_id": eid}) self.log.debug("Deleted master record of %s/%s.", etype, eid) @@ -552,8 +566,12 @@ def delete_eid(self, etype: str, eid: str): except Exception as e: raise DatabaseError(f"Delete of master record failed: {e}\n{eid}") from e try: - res = self._db[snapshot_col].delete_many({"eid": eid}) + res = self._db[snapshot_col].delete_many({"_id": eid}) self.log.debug("deleted %s snapshots of %s/%s.", res.deleted_count, etype, eid) + res = self._db[os_snapshot_col].delete_many({"eid": eid}) + self.log.debug( + "Deleted %s oversized snapshots of %s/%s.", res.deleted_count, etype, eid + ) except Exception as e: raise DatabaseError(f"Delete of snapshots failed: {e}\n{eid}") from e for f in self._on_entity_delete_one: @@ -673,7 +691,9 @@ def get_latest_snapshot(self, etype: str, eid: str) -> dict: self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) - return self._db[snapshot_col].find_one({"eid": eid}, sort=[("_id", -1)]) or {} + return ( + self._db[snapshot_col].find_one({"eid": eid}, {"last": 1}, sort=[("_id", -1)]) or {} + ).get("last", {}) def _get_latest_snapshots_date(self) -> Optional[datetime]: """Get date of newest snapshot set. @@ -735,15 +755,6 @@ def get_latest_snapshots( snapshot_col = self._snapshots_col_name(etype) - # Find newest fully completed snapshot date - latest_snapshot_date = self._get_latest_snapshots_date() - - # There are no fully completed snapshots sets - return all currently existing snapshots - if latest_snapshot_date is None: - return self._db[snapshot_col].find().sort([("eid", pymongo.ASCENDING)]), self._db[ - snapshot_col - ].count_documents({}) - if not fulltext_filters: fulltext_filters = {} @@ -752,7 +763,6 @@ def get_latest_snapshots( # Create base of query query = generic_filter - query["_time_created"] = latest_snapshot_date # Process fulltext filters for attr in fulltext_filters: @@ -760,7 +770,7 @@ def get_latest_snapshots( # EID filter if attr == "eid": - query[attr] = fulltext_filter + query["_id"] = fulltext_filter continue # Check if attribute exists @@ -771,20 +781,20 @@ def get_latest_snapshots( # Correctly handle link<...> data type if attr_spec.t in AttrType.PLAIN | AttrType.OBSERVATIONS and attr_spec.is_relation: - query[attr + ".eid"] = fulltext_filter + query["last." + attr + ".eid"] = fulltext_filter else: - query[attr] = fulltext_filter + query["last." + attr] = fulltext_filter try: - return self._db[snapshot_col].find(query).sort([("eid", pymongo.ASCENDING)]), self._db[ - snapshot_col - ].count_documents(query) + return self._db[snapshot_col].find(query, {"last": 1}).sort( + [("_id", pymongo.ASCENDING)] + ), self._db[snapshot_col].count_documents(query) except OperationFailure as e: raise DatabaseError("Invalid query") from e def get_snapshots( self, etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None - ) -> Cursor: + ) -> Union[Cursor, CommandCursor]: """Get all (or filtered) snapshots of given `eid`. This method is useful for displaying `eid`'s history on web. @@ -799,6 +809,37 @@ def get_snapshots( self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) + + # Find out if the snapshot is oversized + doc = self._db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) + if doc and doc.get("oversized", False): + return self._get_snapshots_oversized(etype, eid, t1, t2) + + query = {"_time_created": {}} + pipeline = [ + {"$match": {"_id": eid}}, + {"$set": {"history": {"$concatArrays": [["$last"], "$history"]}}}, + {"$unwind": "$history"}, + {"$replaceRoot": {"newRoot": "$history"}}, + ] + + # Filter by date + if t1: + query["_time_created"]["$gte"] = t1 + if t2: + query["_time_created"]["$lte"] = t2 + + # Unset if empty + if query["_time_created"]: + pipeline.append({"$match": query}) + pipeline.append({"$sort": {"_time_created": pymongo.ASCENDING}}) + return self._db[snapshot_col].aggregate(pipeline) + + def _get_snapshots_oversized( + self, etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None + ) -> Cursor: + """Get all (or filtered) snapshots of given `eid` from oversized snapshots collection.""" + snapshot_col = self._oversized_snapshots_col_name(etype) query = {"eid": eid, "_time_created": {}} # Filter by date @@ -861,25 +902,95 @@ def estimate_count_eids(self, etype: str) -> int: master_col = self._master_col_name(etype) return self._db[master_col].estimated_document_count({}) + def _migrate_to_oversized_snapshot(self, etype: str, eid: str, snapshot: dict): + snapshot_col = self._snapshots_col_name(etype) + os_col = self._oversized_snapshots_col_name(etype) + + try: + doc = self._db[snapshot_col].find_one_and_update( + {"_id": eid}, + { + "$set": {"oversized": True, "last": snapshot, "count": 0}, + "$unset": {"history": ""}, + }, + ) + inserts = list(doc.get("history", [])) + inserts.insert(0, doc.get("last", {})) + inserts.insert(0, snapshot) + self._db[os_col].insert_many(inserts) + except Exception as e: + raise DatabaseError(f"Update of snapshot {eid} failed: {e}, {snapshot}") from e + def save_snapshot(self, etype: str, snapshot: dict, time: datetime): - """Saves snapshot to specified entity of current master document.""" + """Saves snapshot to specified entity of current master document. + + Will move snapshot to oversized snapshots if the maintained bucket is too large. + """ # Check `etype` self._assert_etype_exists(etype) snapshot["_time_created"] = time + if "eid" not in snapshot: + self.log.error("Snapshot is missing 'eid' field: %s", snapshot) + return + eid = snapshot["eid"] snapshot_col = self._snapshots_col_name(etype) + os_col = self._oversized_snapshots_col_name(etype) + + # Find out if the snapshot is oversized + doc = self._db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) + if doc is None: + # First snapshot of entity + self._db[snapshot_col].insert_one( + {"_id": eid, "last": snapshot, "history": [], "oversized": False, "count": 0} + ) + self.log.debug(f"Inserted snapshot of {eid}") + return + elif doc.get("oversized", False): + # Snapshot is already marked as oversized + self._db[snapshot_col].update_one({"_id": eid}, {"$set": {"last": snapshot}}) + self._db[os_col].insert_one(snapshot) + return + try: - self._db[snapshot_col].insert_one(snapshot) - self.log.debug(f"Inserted snapshot: {snapshot}") + # Update a normal snapshot bucket + res = self._db[snapshot_col].update_one( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + ["$last"], + "$history", + ] + }, + "count": {"$sum": ["$count", 1]}, + } + }, + {"$set": {"last": {"$literal": snapshot}}}, + ], + ) + if res.modified_count == 0: + self.log.error(f"Snapshot of {eid} was not updated, {res.raw_result}") + except (WriteError, OperationFailure) as e: + if e.code != BSON_OBJECT_TOO_LARGE: + raise e + # The snapshot is too large, move it to oversized snapshots + self.log.info(f"Snapshot of {eid} is too large: {e}, marking as oversized.") + self._migrate_to_oversized_snapshot(etype, eid, snapshot) except Exception as e: - raise DatabaseError(f"Insert of snapshot failed: {e}\n{snapshot}") from e + raise DatabaseError(f"Insert of snapshot {eid} failed: {e}, {snapshot}") from e def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): """ Saves a list of snapshots of current master documents. All snapshots must belong to same entity type. + + Will move snapshots to oversized snapshots if the maintained bucket is too large. + For better understanding, see `save_snapshot()`. """ # Check `etype` self._assert_etype_exists(etype) @@ -888,18 +999,151 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): snapshot["_time_created"] = time snapshot_col = self._snapshots_col_name(etype) - try: - res = self._db[snapshot_col].insert_many(snapshots, ordered=False) - if len(res.inserted_ids) != len(snapshots): - self.log.error( - "Inserted only %s snapshots when trying to insert %s", - len(res.inserted_ids), - len(snapshots), + os_col = self._oversized_snapshots_col_name(etype) + + snapshots_by_eid = defaultdict(list) + for snapshot in snapshots: + if "eid" not in snapshot: + continue + if "_id" in snapshot: + del snapshot["_id"] + snapshots_by_eid[snapshot["eid"]].append(snapshot) + + # Find out if any of the snapshots are oversized + docs = list( + self._db[snapshot_col].find( + {"_id": {"$in": list(snapshots_by_eid.keys())}}, {"oversized": 1, "eid": 1} + ) + ) + + updates = [] + update_originals = [] + oversized_inserts = [] + oversized_updates = [] + + for doc in docs: + eid = doc["_id"] + if not doc.get("oversized", False): + # A normal snapshot, shift the last snapshot to history and update last + updates.append( + UpdateOne( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + snapshots_by_eid[eid][:-1], + ["$last"], + "$history", + ] + }, + "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, + } + }, + {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, + ], + ) ) + update_originals.append(snapshots_by_eid[eid]) else: - self.log.debug(f"Inserted snapshots: {snapshots}") - except Exception as e: - raise DatabaseError(f"Insert of snapshots failed: {e}\n{snapshots}") from e + # Snapshot is already marked as oversized + oversized_inserts.extend(snapshots_by_eid[eid]) + oversized_updates.append( + UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + ) + del snapshots_by_eid[eid] + + # The remaining snapshots are new + inserts = [ + { + "_id": eid, + "last": eid_snapshots[-1], + "history": eid_snapshots[:-1], + "oversized": False, + "count": len(eid_snapshots) - 1, + } + for eid, eid_snapshots in snapshots_by_eid.items() + ] + + if updates: + try: + res = self._db[snapshot_col].bulk_write(updates, ordered=False) + if res.modified_count != len(updates): + self.log.warning( + "Some snapshots were not updated, %s != %s", + res.modified_count, + len(snapshots_by_eid), + ) + except (BulkWriteError, OperationFailure) as e: + self.log.info("Update of snapshots failed, will retry with oversize.") + failed_indexes = [ + err["index"] + for err in e.details["writeErrors"] + if err["code"] == BSON_OBJECT_TOO_LARGE + ] + failed_snapshots = (update_originals[i] for i in failed_indexes) + for eid_snapshots in failed_snapshots: + eid = eid_snapshots[0]["eid"] + failed_snapshots = sorted( + eid_snapshots, key=lambda s: s["_time_created"], reverse=True + ) + self._migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0]) + oversized_inserts.extend(failed_snapshots[1:]) + + if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]): + # Some other error occurred + raise e + except Exception as e: + raise DatabaseError(f"Update of snapshots failed: {str(e)[:2048]}") from e + + if inserts: + try: + # Insert new snapshots + res = self._db[snapshot_col].insert_many(inserts, ordered=False) + if len(res.inserted_ids) != len(snapshots_by_eid): + self.log.warning( + "Some snapshots were not inserted, %s != %s", + len(res.inserted_ids), + len(snapshots_by_eid), + ) + except (DocumentTooLarge, OperationFailure) as e: + self.log.info(f"Inserted snapshot is too large, will retry with oversize. {e}") + checked_inserts = [] + oversized_inserts = [] + + # Filter out the oversized snapshots + for insert_doc in inserts: + bsize = len(bson.BSON.encode(insert_doc)) + if bsize < 16 * 1024 * 1024: + checked_inserts.append(insert_doc) + else: + eid = insert_doc["_id"] + checked_inserts.append( + { + "_id": eid, + "last": insert_doc["last"], + "oversized": True, + "history": [], + "count": 0, + } + ) + oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]]) + try: + self._db[snapshot_col].insert_many(checked_inserts, ordered=False) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {e}") from e + except Exception as e: + raise DatabaseError(f"Insert of snapshot failed: {e}") from e + + # Update the oversized snapshots + if oversized_inserts: + try: + if oversized_updates: + self._db[snapshot_col].bulk_write(oversized_updates) + self._db[os_col].insert_many(oversized_inserts) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e def _get_metadata_id(self, module: str, time: datetime, worker_id: Optional[int] = None) -> str: """Generates unique metadata id based on `module`, `time` and the worker index.""" @@ -1098,6 +1342,7 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]: self._assert_etype_exists(etype) snapshot_col = self._snapshots_col_name(etype) + os_snapshot_col = self._oversized_snapshots_col_name(etype) # Get attribute specification try: @@ -1120,37 +1365,48 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]: ) # Build aggregation query - agg_query = [ - {"$match": {"_time_created": latest_snapshot_date}}, - ] + attr_path = "$last." + attr + os_attr_path = "$" + attr + unwinding = [] + os_unwinding = [] # Unwind array-like and multi value attributes # If attribute is multi value array, unwind twice if "array" in attr_spec.data_type.root or "set" in attr_spec.data_type.root: - agg_query.append({"$unwind": "$" + attr}) + unwinding.append({"$unwind": attr_path}) + os_unwinding.append({"$unwind": os_attr_path}) if attr_spec.t == AttrType.OBSERVATIONS and attr_spec.multi_value: - agg_query.append({"$unwind": "$" + attr}) + unwinding.append({"$unwind": attr_path}) + os_unwinding.append({"$unwind": os_attr_path}) # Group - agg_query_group_id = "$" + attr + agg_query_group_id = attr_path + os_agg_query_group_id = os_attr_path if "link" in attr_spec.data_type.root: agg_query_group_id += ".eid" - agg_query.append( - { - "$group": { - "_id": agg_query_group_id, - "count": {"$sum": 1}, - } - } - ) - - # Sort - agg_query.append({"$sort": {"_id": 1, "count": -1}}) + os_agg_query_group_id += ".eid" + agg_query = [ + *unwinding, + {"$group": {"_id": agg_query_group_id, "count": {"$sum": 1}}}, + {"$sort": {"_id": 1, "count": -1}}, + ] # Run aggregation distinct_counts_cur = self._db[snapshot_col].aggregate(agg_query) distinct_counts = {x["_id"]: x["count"] for x in distinct_counts_cur} + + # Run aggregation for oversized snapshots and sum the counts + agg_query = [ + {"$match": {"_time_created": latest_snapshot_date}}, + *os_unwinding, + {"$group": {"_id": os_agg_query_group_id, "count": {"$sum": 1}}}, + {"$sort": {"_id": 1, "count": -1}}, + ] + os_distinct_counts_cur = self._db[os_snapshot_col].aggregate(agg_query) + for x in os_distinct_counts_cur: + distinct_counts[x["_id"]] = distinct_counts.get(x["_id"], 0) + x["count"] + if None in distinct_counts: del distinct_counts[None] @@ -1240,16 +1496,24 @@ def drop_empty_archives(self, etype: str) -> int: raise DatabaseError(f"Drop of empty archive failed: {e}") from e return dropped_count - def delete_old_snapshots(self, etype: str, t_old: datetime): + def delete_old_snapshots(self, etype: str, t_old: datetime, n_old: int) -> int: """Delete old snapshots. Periodically called for all `etype`s from HistoryManager. """ snapshot_col_name = self._snapshots_col_name(etype) + os_snapshot_col_name = self._oversized_snapshots_col_name(etype) + deleted = 0 try: - return self._db[snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}}) + res = self._db[snapshot_col_name].update_many( + {"count": {"$gt": n_old}}, {"$pop": {"history": 1}, "$inc": {"count": -1}} + ) + deleted += res.modified_count + res = self._db[os_snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}}) + deleted += res.deleted_count except Exception as e: raise DatabaseError(f"Delete of olds snapshots failed: {e}") from e + return deleted def get_module_cache(self, override_called_id: Optional[str] = None): """Return a persistent cache collection for given module name. diff --git a/dp3/history_management/history_manager.py b/dp3/history_management/history_manager.py index eff9eff2..7d67338d 100644 --- a/dp3/history_management/history_manager.py +++ b/dp3/history_management/history_manager.py @@ -111,6 +111,8 @@ def __init__( registrar.scheduler_register( self.delete_old_snapshots, **snapshot_cleaning_schedule.model_dump() ) + self.keep_snapshot_count = self._get_snapshot_count_to_keep(platform_config.config) + self.log.info("Keeping %s snapshots", self.keep_snapshot_count) # Schedule datapoint archivation archive_config = self.config.datapoint_archivation @@ -121,6 +123,45 @@ def __init__( self.log_dir = None registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.model_dump()) + def _get_snapshot_count_to_keep(self, config) -> int: + """Returns how many snapshots should be kept based on configuration. + + This depends on the frequency of snapshot creation and the max snapshot age. + """ + max_age_days = self.keep_snapshot_delta.total_seconds() / 3600 / 24 + creation_rate = config.get("snapshots.creation_rate", {"minute": "*/30"}) + if len(creation_rate) > 1: + raise ValueError("Only one snapshot creation rate is supported.") + + for key, value in creation_rate.items(): + if value.startswith("*/"): + snapshot_interval = int(value[2:]) + if key == "second": + snapshots_per_day = 60 * 60 * 24 / snapshot_interval + elif key == "minute": + snapshots_per_day = 60 * 24 / snapshot_interval + elif key == "hour": + snapshots_per_day = 24 / snapshot_interval + elif key == "day": + snapshots_per_day = 1 / snapshot_interval + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + else: + count = len(value.split(",")) + if key == "second": + snapshots_per_day = 60 * 24 * count + elif key == "minute": + snapshots_per_day = 24 * count + elif key == "hour": + snapshots_per_day = count + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + break + else: + raise ValueError(f"Unsupported snapshot creation rate: {creation_rate}") + + return int(max_age_days * snapshots_per_day) + def delete_old_dps(self): """Deletes old data points from master collection.""" self.log.debug("Deleting old records ...") @@ -147,13 +188,14 @@ def delete_old_dps(self): def delete_old_snapshots(self): """Deletes old snapshots.""" t_old = datetime.now() - self.keep_snapshot_delta - self.log.debug("Deleting all snapshots before %s", t_old) + n_old = self.keep_snapshot_count + self.log.debug("Deleting all snapshots before %s and over %s in total", t_old, n_old) deleted_total = 0 for etype in self.model_spec.entities: try: - result = self.db.delete_old_snapshots(etype, t_old) - deleted_total += result.deleted_count + result = self.db.delete_old_snapshots(etype, t_old, n_old) + deleted_total += result except DatabaseError as e: self.log.exception(e) self.log.debug("Deleted %s snapshots in total.", deleted_total) From 40f25e7ca9c0df2db3de247ac1b27e9acb35c5c9 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Mon, 29 Jul 2024 10:27:29 +0200 Subject: [PATCH 02/17] Database: Cache snapshot state. --- dp3/database/database.py | 145 +++++++++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 51 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index 02f17320..a3cf60a1 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -157,6 +157,9 @@ def __init__( self._sched.register(self._push_raw, second=seconds, misfire_grace_time=5) self._sched.register(self._push_master, second=seconds, misfire_grace_time=5) + self._normal_snapshot_eids = defaultdict(set) + self._oversized_snapshot_eids = defaultdict(set) + self.log.info("Database successfully initialized!") @staticmethod @@ -939,15 +942,16 @@ def save_snapshot(self, etype: str, snapshot: dict, time: datetime): os_col = self._oversized_snapshots_col_name(etype) # Find out if the snapshot is oversized - doc = self._db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) - if doc is None: + normal, oversized, new = self._get_snapshot_state(etype, {eid}) + if new: # First snapshot of entity self._db[snapshot_col].insert_one( {"_id": eid, "last": snapshot, "history": [], "oversized": False, "count": 0} ) self.log.debug(f"Inserted snapshot of {eid}") + self._cache_snapshot_state(etype, new, oversized) return - elif doc.get("oversized", False): + elif oversized: # Snapshot is already marked as oversized self._db[snapshot_col].update_one({"_id": eid}, {"$set": {"last": snapshot}}) self._db[os_col].insert_one(snapshot) @@ -980,9 +984,42 @@ def save_snapshot(self, etype: str, snapshot: dict, time: datetime): # The snapshot is too large, move it to oversized snapshots self.log.info(f"Snapshot of {eid} is too large: {e}, marking as oversized.") self._migrate_to_oversized_snapshot(etype, eid, snapshot) + self._cache_snapshot_state(etype, set(), normal) except Exception as e: raise DatabaseError(f"Insert of snapshot {eid} failed: {e}, {snapshot}") from e + def _get_snapshot_state(self, etype: str, eids: set[str]) -> tuple[set, set, set]: + """Get current state of snapshot of given `eid`.""" + unknown = eids + normal = self._normal_snapshot_eids[etype] & unknown + oversized = self._oversized_snapshot_eids[etype] & unknown + unknown = unknown - normal - oversized + + if not unknown: + return normal, oversized, unknown + + snapshot_col = self._snapshots_col_name(etype) + new_normal = set() + new_oversized = set() + for doc in self._db[snapshot_col].find({"_id": {"$in": list(unknown)}}, {"oversized": 1}): + if doc.get("oversized", False): + new_oversized.add(doc["_id"]) + else: + new_normal.add(doc["_id"]) + + self._normal_snapshot_eids[etype] |= new_normal + self._oversized_snapshot_eids[etype] |= new_oversized + unknown = unknown - new_normal - new_oversized + + return normal | new_normal, oversized | new_oversized, unknown + + def _cache_snapshot_state(self, etype: str, normal: set, oversized: set): + """Cache snapshot state for given `etype`.""" + self._normal_snapshot_eids[etype] |= normal + + self._normal_snapshot_eids[etype] -= oversized + self._oversized_snapshot_eids[etype] |= oversized + def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): """ Saves a list of snapshots of current master documents. @@ -1010,61 +1047,60 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): snapshots_by_eid[snapshot["eid"]].append(snapshot) # Find out if any of the snapshots are oversized - docs = list( - self._db[snapshot_col].find( - {"_id": {"$in": list(snapshots_by_eid.keys())}}, {"oversized": 1, "eid": 1} - ) - ) + normal, oversized, new = self._get_snapshot_state(etype, set(snapshots_by_eid.keys())) + inserts = [] updates = [] update_originals = [] oversized_inserts = [] oversized_updates = [] - for doc in docs: - eid = doc["_id"] - if not doc.get("oversized", False): - # A normal snapshot, shift the last snapshot to history and update last - updates.append( - UpdateOne( - {"_id": eid}, - [ - { - "$set": { - "history": { - "$concatArrays": [ - snapshots_by_eid[eid][:-1], - ["$last"], - "$history", - ] - }, - "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, - } - }, - {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, - ], - ) - ) - update_originals.append(snapshots_by_eid[eid]) - else: - # Snapshot is already marked as oversized - oversized_inserts.extend(snapshots_by_eid[eid]) - oversized_updates.append( - UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + # A normal snapshot, shift the last snapshot to history and update last + for eid in normal: + updates.append( + UpdateOne( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + snapshots_by_eid[eid][:-1], + ["$last"], + "$history", + ] + }, + "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, + } + }, + {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, + ], ) - del snapshots_by_eid[eid] + ) + update_originals.append(snapshots_by_eid[eid]) + + # Snapshot is already marked as oversized + for eid in oversized: + oversized_inserts.extend(snapshots_by_eid[eid]) + oversized_updates.append( + UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + ) # The remaining snapshots are new - inserts = [ - { - "_id": eid, - "last": eid_snapshots[-1], - "history": eid_snapshots[:-1], - "oversized": False, - "count": len(eid_snapshots) - 1, - } - for eid, eid_snapshots in snapshots_by_eid.items() - ] + for eid in new: + eid_snapshots = snapshots_by_eid[eid] + inserts.append( + { + "_id": eid, + "last": eid_snapshots[-1], + "history": eid_snapshots[:-1], + "oversized": False, + "count": len(eid_snapshots) - 1, + } + ) + + new_oversized = set() + new_normal = set() if updates: try: @@ -1073,7 +1109,7 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): self.log.warning( "Some snapshots were not updated, %s != %s", res.modified_count, - len(snapshots_by_eid), + len(updates), ) except (BulkWriteError, OperationFailure) as e: self.log.info("Update of snapshots failed, will retry with oversize.") @@ -1090,6 +1126,7 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): ) self._migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0]) oversized_inserts.extend(failed_snapshots[1:]) + new_oversized.add(eid) if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]): # Some other error occurred @@ -1101,6 +1138,7 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): try: # Insert new snapshots res = self._db[snapshot_col].insert_many(inserts, ordered=False) + new_normal.update(res.inserted_ids) if len(res.inserted_ids) != len(snapshots_by_eid): self.log.warning( "Some snapshots were not inserted, %s != %s", @@ -1129,8 +1167,10 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): } ) oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]]) + new_oversized.add(eid) try: - self._db[snapshot_col].insert_many(checked_inserts, ordered=False) + res = self._db[snapshot_col].insert_many(checked_inserts, ordered=False) + new_normal.update(res.inserted_ids) except Exception as e: raise DatabaseError(f"Insert of snapshots failed: {e}") from e except Exception as e: @@ -1145,6 +1185,9 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): except Exception as e: raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e + # Cache the new state + self._cache_snapshot_state(etype, new_normal, new_oversized) + def _get_metadata_id(self, module: str, time: datetime, worker_id: Optional[int] = None) -> str: """Generates unique metadata id based on `module`, `time` and the worker index.""" worker_id = self._process_index if worker_id is None else worker_id From e1c22e9a194056d92e742e54a491688b3a400ccc Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Mon, 29 Jul 2024 10:38:02 +0200 Subject: [PATCH 03/17] Database: Add snapshot migration script. --- dp3/scripts/migrate_snapshots.py | 307 +++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100755 dp3/scripts/migrate_snapshots.py diff --git a/dp3/scripts/migrate_snapshots.py b/dp3/scripts/migrate_snapshots.py new file mode 100755 index 00000000..fff5e836 --- /dev/null +++ b/dp3/scripts/migrate_snapshots.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +"""Simple script to migrate snapshots schema from flat single-snapshot documents to +a nested history-last schema. + +Intended to be run only once to upgrade a database existing before 08-2024. +""" + +import argparse +from collections import defaultdict + +import bson +from pymongo import UpdateOne +from pymongo.errors import BulkWriteError, DocumentTooLarge, OperationFailure, WriteError + +from dp3.common.config import ModelSpec, read_config_dir +from dp3.database.database import DatabaseError, EntityDatabase, MongoConfig + +# Arguments parser +parser = argparse.ArgumentParser( + description="Add hashes to master records to allow for easier parallelization in ADiCT." +) +parser.add_argument( + "--config", + default="/etc/adict/config", + help="DP3 config directory (default: /etc/adict/config)", +) +parser.add_argument("--dry-run", action="store_true", help="Do not write to database") +parser.add_argument("-n", default=100, type=int, help="Number of updates to send per request") +args = parser.parse_args() + +# Load DP3 configuration +config = read_config_dir(args.config, recursive=True) +model_spec = ModelSpec(config.get("db_entities")) + +# Connect to database +connection_conf = MongoConfig.model_validate(config.get("database", {})) +client = EntityDatabase.connect(connection_conf) +print(client) +client.admin.command("ping") + +db = client[connection_conf.db_name] + +BSON_OBJECT_TOO_LARGE = 10334 + + +def _migrate_to_oversized_snapshot(etype: str, eid: str, snapshot: dict): + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + if "_id" in snapshot: + print(f"Removing _id {snapshot['_id']} from snapshot of {eid}") + del snapshot["_id"] + + try: + doc = db[snapshot_col].find_one_and_update( + {"_id": eid}, + { + "$set": {"oversized": True, "last": snapshot, "count": 0}, + "$unset": {"history": ""}, + }, + ) + inserts = list(doc.get("history", [])) + inserts.insert(0, doc.get("last", {})) + inserts.insert(0, snapshot) + db[os_col].insert_many(inserts) + except Exception as e: + raise DatabaseError(f"Update of snapshot {eid} failed: {e}, {snapshot}") from e + + +def save_snapshot(etype: str, snapshot: dict): + """Saves snapshot to specified entity of current master document. + + Will move snapshot to oversized snapshots if the maintained bucket is too large. + """ + if "eid" not in snapshot: + return + eid = snapshot["eid"] + + if "_id" in snapshot: + print(f"Removing _id {snapshot['_id']} from snapshot of {eid}") + del snapshot["_id"] + + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + # Find out if the snapshot is oversized + doc = db[snapshot_col].find_one({"_id": eid}, {"oversized": 1}) + if doc is None: + # First snapshot of entity + db[snapshot_col].insert_one( + {"_id": eid, "last": snapshot, "history": [], "oversized": False, "count": 0} + ) + print(f"Inserted snapshot of {eid}") + return + elif doc.get("oversized", False): + # Snapshot is already marked as oversized + db[snapshot_col].update_one({"_id": eid}, {"$set": {"last": snapshot}}) + db[os_col].insert_one(snapshot) + return + + try: + # Update a normal snapshot bucket + res = db[snapshot_col].update_one( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + ["$last"], + "$history", + ] + }, + "count": {"$sum": ["$count", 1]}, + } + }, + {"$set": {"last": {"$literal": snapshot}}}, + ], + ) + if res.modified_count == 0: + print(f"Snapshot of {eid} was not updated, {res.raw_result}") + except (WriteError, OperationFailure) as e: + if e.code != BSON_OBJECT_TOO_LARGE: + raise e + # The snapshot is too large, move it to oversized snapshots + print(f"Snapshot of {eid} is too large: {e}, marking as oversized.") + _migrate_to_oversized_snapshot(etype, eid, snapshot) + except Exception as e: + raise DatabaseError(f"Insert of snapshot {eid} failed: {e}, {snapshot}") from e + + +def save_snapshots(etype: str, snapshots: list[dict]): + """ + Saves a list of snapshots of current master documents. + + All snapshots must belong to same entity type. + + Will move snapshots to oversized snapshots if the maintained bucket is too large. + For better understanding, see `save_snapshot()`. + + """ + snapshots_by_eid = defaultdict(list) + for snapshot in snapshots: + if "eid" not in snapshot: + continue + if "_id" in snapshot: + del snapshot["_id"] + snapshots_by_eid[snapshot["eid"]].append(snapshot) + print(f"Saving {len(snapshots)} snapshots of {len(snapshots_by_eid)} entities of {etype}") + + snapshot_col = f"{etype}#snapshots" + os_col = f"{etype}#snapshots_oversized" + + # Find out if any of the snapshots are oversized + docs = list( + db[snapshot_col].find( + {"_id": {"$in": list(snapshots_by_eid.keys())}}, {"oversized": 1, "eid": 1} + ) + ) + + updates = [] + update_originals = [] + oversized_inserts = [] + oversized_updates = [] + + for doc in docs: + eid = doc["_id"] + if not doc.get("oversized", False): + # A normal snapshot, shift the last snapshot to history and update last + updates.append( + UpdateOne( + {"_id": eid}, + [ + { + "$set": { + "history": { + "$concatArrays": [ + snapshots_by_eid[eid][:-1], + ["$last"], + "$history", + ] + }, + "count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]}, + } + }, + {"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}}, + ], + ) + ) + update_originals.append(snapshots_by_eid[eid]) + else: + # Snapshot is already marked as oversized + oversized_inserts.extend(snapshots_by_eid[eid]) + oversized_updates.append( + UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}}) + ) + del snapshots_by_eid[eid] + + # The remaining snapshots are new + inserts = [ + { + "_id": eid, + "last": eid_snapshots[-1], + "history": eid_snapshots[:-1], + "oversized": False, + "count": len(eid_snapshots) - 1, + } + for eid, eid_snapshots in snapshots_by_eid.items() + ] + + if updates: + try: + res = db[snapshot_col].bulk_write(updates, ordered=False) + if res.modified_count != len(updates): + print( + f"Some snapshots were not updated, " + f"{res.modified_count} != {len(snapshots_by_eid)}" + ) + except (BulkWriteError, OperationFailure) as e: + print("Update of snapshots failed, will retry with oversize.") + failed_indexes = [ + err["index"] + for err in e.details["writeErrors"] + if err["code"] == BSON_OBJECT_TOO_LARGE + ] + failed_snapshots = (update_originals[i] for i in failed_indexes) + for eid_snapshots in failed_snapshots: + eid = eid_snapshots[0]["eid"] + failed_snapshots = sorted( + eid_snapshots, key=lambda s: s["_time_created"], reverse=True + ) + _migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0]) + oversized_inserts.extend(failed_snapshots[1:]) + + if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]): + # Some other error occurred + raise e + except Exception as e: + raise DatabaseError(f"Update of snapshots failed: {str(e)[:2048]}") from e + + if inserts: + try: + # Insert new snapshots + res = db[snapshot_col].insert_many(inserts, ordered=False) + if len(res.inserted_ids) != len(snapshots_by_eid): + print( + f"Some snapshots were not inserted, " + f"{len(res.inserted_ids)} != {len(snapshots_by_eid)}" + ) + except (DocumentTooLarge, OperationFailure) as e: + print(f"Snapshot too large: {e}") + checked_inserts = [] + oversized_inserts = [] + + # Filter out the oversized snapshots + for insert_doc in inserts: + bsize = len(bson.BSON.encode(insert_doc)) + if bsize < 16 * 1024 * 1024: + checked_inserts.append(insert_doc) + else: + eid = insert_doc["_id"] + checked_inserts.append( + { + "_id": eid, + "last": insert_doc["last"], + "oversized": True, + "history": [], + "count": 0, + } + ) + oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]]) + try: + db[snapshot_col].insert_many(checked_inserts, ordered=False) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {e}") from e + except Exception as e: + raise DatabaseError(f"Insert of snapshot failed: {e}") from e + + # Update the oversized snapshots + if oversized_inserts: + try: + if oversized_updates: + db[snapshot_col].bulk_write(oversized_updates) + db[os_col].insert_many(oversized_inserts) + except Exception as e: + raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e + + +for entity in model_spec.entities: + if len(list(db.list_collections(filter={"name": f"{entity}#snapshots_old"}))) == 0: + db[f"{entity}#snapshots"].rename(f"{entity}#snapshots_old") + print(entity) + + snapshots = [] + for record in db[f"{entity}#snapshots_old"].find({}, sort=[("_time_created", 1)]): + del record["_id"] + snapshots.append(record) + if len(snapshots) >= args.n: + if not args.dry_run: + save_snapshots(entity, snapshots) + else: + print(f"Would save {len(snapshots)} snapshots") + snapshots.clear() + if snapshots and not args.dry_run: + save_snapshots(entity, snapshots) + elif snapshots: + print(f"Would save {len(snapshots)} snapshots") From a9328c3c90c80ffb62059c9e4f60c5e67ecee811 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Mon, 29 Jul 2024 15:03:37 +0200 Subject: [PATCH 04/17] Database: Index oversized snapshots. --- dp3/database/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index a3cf60a1..fe653c2a 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -243,7 +243,7 @@ def _init_database_schema(self, db_name) -> None: """ for etype in self._db_schema_config.entities: # Snapshots index on `eid` and `_time_created` fields - snapshot_col = self._snapshots_col_name(etype) + snapshot_col = self._oversized_snapshots_col_name(etype) self._db[snapshot_col].create_index("eid", background=True) self._db[snapshot_col].create_index("_time_created", background=True) From 3d44f76d13c2c3843f5e76dbb74f680ff5493fee Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Tue, 30 Jul 2024 15:06:07 +0200 Subject: [PATCH 05/17] Database: Fix lock usage. --- dp3/database/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index fe653c2a..cc000b44 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -425,7 +425,7 @@ def _push_raw(self): def _push_master(self): """Push master changes to database.""" - for etype, lock in self._raw_buffer_locks.items(): + for etype, lock in self._master_buffer_locks.items(): master_col = self._db.get_collection( self._master_col_name(etype), write_concern=WriteConcern(w=1) ) From 791aca66d24e09ab45340fba7f74b5a29124075e Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Wed, 31 Jul 2024 10:38:56 +0200 Subject: [PATCH 06/17] Database: Drop snapshots based on timestamps. --- dp3/database/database.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index cc000b44..0ca8fbda 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -1549,7 +1549,20 @@ def delete_old_snapshots(self, etype: str, t_old: datetime, n_old: int) -> int: deleted = 0 try: res = self._db[snapshot_col_name].update_many( - {"count": {"$gt": n_old}}, {"$pop": {"history": 1}, "$inc": {"count": -1}} + {"count": {"$gte": n_old}}, + [ + { + "$set": { + "history": { + "$filter": { + "input": "$history", + "cond": {"$gte": ["$$this._time_created", t_old]}, + } + } + } + }, + {"$set": {"count": {"$size": "$history"}}}, + ], ) deleted += res.modified_count res = self._db[os_snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}}) From 0c774b21b281d3e1a2b16b768a9edb8325a8b15a Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Wed, 31 Jul 2024 16:23:56 +0200 Subject: [PATCH 07/17] Database: Reduce size of `.t2` index. --- dp3/database/database.py | 93 +++++++++++++++++++++-- dp3/history_management/history_manager.py | 25 ++++++ dp3/scripts/add_min_t2s.py | 79 +++++++++++++++++++ 3 files changed, 190 insertions(+), 7 deletions(-) create mode 100755 dp3/scripts/add_min_t2s.py diff --git a/dp3/database/database.py b/dp3/database/database.py index 0ca8fbda..1d0dbd30 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -16,7 +16,7 @@ from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor from pymongo.errors import BulkWriteError, DocumentTooLarge, OperationFailure, WriteError -from pymongo.results import DeleteResult +from pymongo.results import DeleteResult, UpdateResult from dp3.common.attrspec import AttrType, timeseries_types from dp3.common.config import HierarchicalDict, ModelSpec @@ -282,9 +282,12 @@ def _init_database_schema(self, db_name) -> None: if "$**" in attr_names: if index.get("wildcardProjection") is not None: covered_attrs = { - k.rsplit(".", maxsplit=1)[0] for k in index["wildcardProjection"] + it + for k in index["wildcardProjection"] + for it in k.rsplit(".", maxsplit=1) } - if not attrs - covered_attrs: + covered_attrs -= {"ts_last_update", "#min_t2s"} + if "t2" not in covered_attrs and not attrs - covered_attrs: create_wildcard = False continue # Index already covers all history attributes self.log.info("Dropping wildcard index %s on %s", index["name"], etype) @@ -302,7 +305,7 @@ def _init_database_schema(self, db_name) -> None: master_col.create_index( [("$**", 1)], name=index_name, - wildcardProjection={f"{attr}.t2": 1 for attr in history_attrs} + wildcardProjection={f"#min_t2s.{attr}": 1 for attr in history_attrs} | {f"{attr}.ts_last_update": 1 for attr in plain_attrs}, ) self.log.info("Created wildcard index %s on %s", index_name, etype) @@ -583,15 +586,91 @@ def delete_eid(self, etype: str, eid: str): except Exception as e: self.log.exception("Error in on_entity_delete_one callback %s: %s", f, e) - def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> None: + def mark_all_entity_dps_t2(self, etype: str, attrs: list[str]) -> UpdateResult: + """ + Updates the `min_t2s` of the master records of `etype` for all records. + + Periodically called for all `etype`s from HistoryManager. + """ + master_col = self._master_col_name(etype) + try: + return self._db[master_col].update_many( + {}, + [ + { + "$set": { + attr_name: { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": f"${attr_name}", + } + } + for attr_name in attrs + } + | { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + for attr_name in attrs + } + } + ], + ) + except Exception as e: + raise DatabaseError(f"Update of min_t2s failed: {e}") from e + + def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> UpdateResult: """Delete old datapoints from master collection. Periodically called for all `etype`s from HistoryManager. """ master_col = self._master_col_name(etype) try: - self._db[master_col].update_many( - {f"{attr_name}.t2": {"$lt": t_old}}, {"$pull": {attr_name: {"t2": {"$lt": t_old}}}} + return self._db[master_col].update_many( + {f"#min_t2s.{attr_name}": {"$lt": t_old}}, + [ + { + "$set": { + attr_name: { + "$filter": { + "input": f"${attr_name}", + "cond": {"$gte": ["$$this.t2", t_old]}, + } + } + } + }, + { + "$set": { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + }, + }, + ], ) except Exception as e: raise DatabaseError(f"Delete of old datapoints failed: {e}") from e diff --git a/dp3/history_management/history_manager.py b/dp3/history_management/history_manager.py index 7d67338d..126eb884 100644 --- a/dp3/history_management/history_manager.py +++ b/dp3/history_management/history_manager.py @@ -65,12 +65,14 @@ class HistoryManagerConfig(BaseModel, extra=Extra.forbid): Attributes: aggregation_schedule: Schedule for master document aggregation. datapoint_cleaning_schedule: Schedule for datapoint cleaning. + mark_datapoints_schedule: Schedule for marking datapoints in master docs. snapshot_cleaning: Configuration for snapshot cleaning. datapoint_archivation: Configuration for datapoint archivation. """ aggregation_schedule: CronExpression datapoint_cleaning_schedule: CronExpression + mark_datapoints_schedule: CronExpression snapshot_cleaning: SnapshotCleaningConfig datapoint_archivation: DPArchivationConfig @@ -101,6 +103,11 @@ def __init__( return # Schedule datapoints cleaning + datapoint_marking_schedule = self.config.mark_datapoints_schedule + registrar.scheduler_register( + self.mark_datapoints_in_master_docs, **datapoint_marking_schedule.model_dump() + ) + datapoint_cleaning_schedule = self.config.datapoint_cleaning_schedule registrar.scheduler_register( self.delete_old_dps, **datapoint_cleaning_schedule.model_dump() @@ -185,6 +192,24 @@ def delete_old_dps(self): except DatabaseError as e: self.log.error(e) + def mark_datapoints_in_master_docs(self): + """Marks the timestamps of all datapoints in master documents.""" + self.log.debug("Marking the datapoint timestamps for all entity records ...") + + for entity, attr_conf in self.model_spec.entity_attributes.items(): + attrs_to_mark = [] + for attr, conf in attr_conf.items(): + if conf.t in AttrType.OBSERVATIONS | AttrType.TIMESERIES: + attrs_to_mark.append(attr) + + if not attrs_to_mark: + continue + try: + res = self.db.mark_all_entity_dps_t2(entity, attrs_to_mark) + self.log.debug("Marked %s records of %s", res.modified_count, entity) + except DatabaseError as e: + self.log.error(e) + def delete_old_snapshots(self): """Deletes old snapshots.""" t_old = datetime.now() - self.keep_snapshot_delta diff --git a/dp3/scripts/add_min_t2s.py b/dp3/scripts/add_min_t2s.py new file mode 100755 index 00000000..81606c25 --- /dev/null +++ b/dp3/scripts/add_min_t2s.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""Simple script to add min_t2s to master records to allow for better indexing. + +Intended to be run only once to upgrade a database existing before 08-2024. +""" + +import argparse +import time + +from dp3.common.attrspec import AttrType +from dp3.common.config import ModelSpec, read_config_dir +from dp3.database.database import EntityDatabase, MongoConfig + +# Arguments parser +parser = argparse.ArgumentParser( + description="Add min_t2s to master records to allow for better indexing." +) +parser.add_argument( + "--config", + default="/etc/adict/config", + help="DP3 config directory (default: /etc/adict/config)", +) +args = parser.parse_args() + +# Load DP3 configuration +config = read_config_dir(args.config, recursive=True) +model_spec = ModelSpec(config.get("db_entities")) + +# Connect to database +connection_conf = MongoConfig.model_validate(config.get("database", {})) +client = EntityDatabase.connect(connection_conf) +client.admin.command("ping") + +db = client[connection_conf.db_name] + + +for entity, attributes in model_spec.entity_attributes.items(): + t1 = time.time() + res = db[f"{entity}#master"].update_many( + {}, + [ + { + "$set": { + attr_name: { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": f"${attr_name}", + } + } + for attr_name, spec in attributes.items() + if spec.t != AttrType.PLAIN + } + | { + f"#min_t2s.{attr_name}": { + "$cond": { + "if": { + "$eq": [ + {"$size": {"$ifNull": [f"${attr_name}", []]}}, + 0, + ] + }, + "then": "$$REMOVE", + "else": {"$min": f"${attr_name}.t2"}, + } + } + for attr_name, spec in attributes.items() + if spec.t != AttrType.PLAIN + } + }, + ], + ) + t2 = time.time() + print(f"Updated {res.modified_count} records for entity {entity} in {t2 - t1:.2f}s") From c742d706581380ae8172230c6cdd115bd7d0f3cb Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Wed, 31 Jul 2024 17:26:23 +0200 Subject: [PATCH 08/17] Update HistoryManager configuration. --- config/history_manager.yml | 4 ++++ dp3/template/app/config/history_manager.yml | 4 ++++ tests/test_config/history_manager.yml | 4 ++++ tests/test_example/config/history_manager.yml | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/config/history_manager.yml b/config/history_manager.yml index 6254e722..2a3c5bd8 100644 --- a/config/history_manager.yml +++ b/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/dp3/template/app/config/history_manager.yml b/dp3/template/app/config/history_manager.yml index 6254e722..2a3c5bd8 100644 --- a/dp3/template/app/config/history_manager.yml +++ b/dp3/template/app/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/tests/test_config/history_manager.yml b/tests/test_config/history_manager.yml index c83c141b..55dd0843 100644 --- a/tests/test_config/history_manager.yml +++ b/tests/test_config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" diff --git a/tests/test_example/config/history_manager.yml b/tests/test_example/config/history_manager.yml index c83c141b..55dd0843 100644 --- a/tests/test_example/config/history_manager.yml +++ b/tests/test_example/config/history_manager.yml @@ -9,6 +9,10 @@ aggregation_schedule: minute: "*/10" # Deleting old datapoints from master records +mark_datapoints_schedule: + hour: "7,19" + minute: "45" + datapoint_cleaning_schedule: minute: "*/30" From ec656e29dce979d0337e9892a869736d349b5101 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Wed, 31 Jul 2024 17:39:50 +0200 Subject: [PATCH 09/17] Database: Fix query key. --- dp3/database/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index 1d0dbd30..cc904081 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -774,7 +774,7 @@ def get_latest_snapshot(self, etype: str, eid: str) -> dict: snapshot_col = self._snapshots_col_name(etype) return ( - self._db[snapshot_col].find_one({"eid": eid}, {"last": 1}, sort=[("_id", -1)]) or {} + self._db[snapshot_col].find_one({"_id": eid}, {"last": 1}, sort=[("_id", -1)]) or {} ).get("last", {}) def _get_latest_snapshots_date(self) -> Optional[datetime]: From 069323630591edba4ba2d47103c13ef7c50762dc Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Wed, 31 Jul 2024 22:20:10 +0200 Subject: [PATCH 10/17] Database: Reduce write concern. --- dp3/database/database.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index cc904081..aed69db0 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -639,9 +639,11 @@ def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> UpdateR Periodically called for all `etype`s from HistoryManager. """ - master_col = self._master_col_name(etype) + master_col = self._db.get_collection( + self._master_col_name(etype), write_concern=WriteConcern(w=1) + ) try: - return self._db[master_col].update_many( + return master_col.update_many( {f"#min_t2s.{attr_name}": {"$lt": t_old}}, [ { From 74f246f283e56a67e92e1344a829aed93463eb3c Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 14:12:02 +0200 Subject: [PATCH 11/17] Scripts: Update description of migrate_snapshots.py. --- dp3/scripts/migrate_snapshots.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dp3/scripts/migrate_snapshots.py b/dp3/scripts/migrate_snapshots.py index fff5e836..99d48868 100755 --- a/dp3/scripts/migrate_snapshots.py +++ b/dp3/scripts/migrate_snapshots.py @@ -17,7 +17,8 @@ # Arguments parser parser = argparse.ArgumentParser( - description="Add hashes to master records to allow for easier parallelization in ADiCT." + description="Migrate snapshots schema from flat single-snapshot documents " + "to a nested history schema." ) parser.add_argument( "--config", From 7e2970652a9790dd27226b4876449a29e3499682 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 14:25:19 +0200 Subject: [PATCH 12/17] Docs: Update configuration of HistoryManager. --- docs/configuration/history_manager.md | 28 +++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/docs/configuration/history_manager.md b/docs/configuration/history_manager.md index 1760ae23..7c8bcc51 100644 --- a/docs/configuration/history_manager.md +++ b/docs/configuration/history_manager.md @@ -13,25 +13,29 @@ Configuration file `history_manager.yml` is very simple: aggregation_schedule: # (1)! minute: "*/10" -datapoint_cleaning_schedule: # (2)! +mark_datapoints_schedule: # (2)! + hour: "7,19" + minute: "45" +datapoint_cleaning_schedule: # (3)! minute: "*/30" snapshot_cleaning: - schedule: {minute: "15,45"} # (3)! - older_than: 7d # (4)! + schedule: {minute: "15,45"} # (4)! + older_than: 7d # (5)! datapoint_archivation: - schedule: {hour: 2, minute: 0} # (5)! - older_than: 7d # (6)! - archive_dir: "data/datapoints/" # (7)! + schedule: {hour: 2, minute: 0} # (6)! + older_than: 7d # (7)! + archive_dir: "data/datapoints/" # (8)! ``` 1. Parameter `aggregation_schedule` sets the interval for DP³ to aggregate observation datapoints in master records. This should be scheduled more often than cleaning of datapoints. -2. Parameter `datapoint_cleaning_schedule` sets interval when should DP³ check if any data in master record of observations and timeseries attributes isn't too old and if there's something too old, removes it. To control what is considered as "too old", see parameter `max_age` in *Database entities* configuration. -3. Parameter `snapshot_cleaning.schedule` sets the interval for DP³ to clean the snapshots collection. Optimally should be scheduled outside the snapshot creation window. See *Snapshots* configuration for more. -4. Parameter `snapshot_cleaning.older_than` sets how old must a snapshot be to be deleted. -5. Parameter `datapoint_archivation.schedule` sets interval for DP³ to archive datapoints from raw collections. -6. Parameter `datapoint_archivation.older_than` sets how old must a datapoint be to be archived. -7. Parameter `datapoint_archivation.archive_dir` sets directory where should be archived old datapoints. If directory doesn't exist, it will be created, but write priviledges must be set correctly. Can be also set to `null` (or not set) to disable archivation and only delete old data. +2. Parameter `mark_datapoints_schedule` sets the interval when the datapoint timestamps are marked for all entities in a master collection. This should be scheduled very rarely, as it's a very expensive operation. +3. Parameter `datapoint_cleaning_schedule` sets interval when should DP³ check if any data in master record of observations and timeseries attributes isn't too old and if there's something too old, removes it. To control what is considered as "too old", see parameter `max_age` in *Database entities* configuration. +4. Parameter `snapshot_cleaning.schedule` sets the interval for DP³ to clean the snapshots collection. Optimally should be scheduled outside the snapshot creation window. See *Snapshots* configuration for more. +5. Parameter `snapshot_cleaning.older_than` sets how old must a snapshot be to be deleted. +6. Parameter `datapoint_archivation.schedule` sets interval for DP³ to archive datapoints from raw collections. +7. Parameter `datapoint_archivation.older_than` sets how old must a datapoint be to be archived. +8. Parameter `datapoint_archivation.archive_dir` sets directory where should be archived old datapoints. If directory doesn't exist, it will be created, but write priviledges must be set correctly. Can be also set to `null` (or not set) to disable archivation and only delete old data. The schedule dictionaries are transformed to cron expressions, see [CronExpression docs][dp3.common.config.CronExpression] for details. \ No newline at end of file From fb6f6c7cdb45b27cf85c3a0706a74ea02bcdda88 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 17:41:38 +0200 Subject: [PATCH 13/17] API: Update reading from snapshot cursor. --- dp3/api/routers/entity.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dp3/api/routers/entity.py b/dp3/api/routers/entity.py index e77ff908..b427eaf8 100644 --- a/dp3/api/routers/entity.py +++ b/dp3/api/routers/entity.py @@ -134,11 +134,10 @@ async def list_entity_type_eids( time_created = None # Remove _id field - result = list(cursor_page) + result = [r["last"] for r in cursor_page] for r in result: time_created = r["_time_created"] del r["_time_created"] - del r["_id"] return EntityEidList( time_created=time_created, count=len(result), total_count=total_count, data=result From d923eacc4c6cea2bdbe6924b28d7185a5914415f Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 17:44:57 +0200 Subject: [PATCH 14/17] Database: Custom serializer for IP types. --- dp3/database/database.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index aed69db0..f3e0b4bd 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -20,7 +20,7 @@ from dp3.common.attrspec import AttrType, timeseries_types from dp3.common.config import HierarchicalDict, ModelSpec -from dp3.common.datapoint import DataPointBase +from dp3.common.datapoint import DataPointBase, to_json_friendly from dp3.common.scheduler import Scheduler from dp3.common.types import EventGroupType from dp3.database.schema_cleaner import SchemaCleaner @@ -135,6 +135,9 @@ def __init__( # Init and switch to correct database self._db = self._db[config.db_name] + type_registry = bson.codec_options.TypeRegistry(fallback_encoder=to_json_friendly) + self._codec_opts = bson.codec_options.CodecOptions(type_registry=type_registry) + if process_index == 0: self._init_database_schema(config.db_name) @@ -412,7 +415,9 @@ def _push_raw(self): continue try: self._db.get_collection( - self._raw_col_name(etype), write_concern=WriteConcern(w=0) + self._raw_col_name(etype), + write_concern=WriteConcern(w=0), + codec_options=self._codec_opts, ).insert_many(dps, ordered=False) end = time.time() self.log.debug( @@ -430,7 +435,9 @@ def _push_master(self): """Push master changes to database.""" for etype, lock in self._master_buffer_locks.items(): master_col = self._db.get_collection( - self._master_col_name(etype), write_concern=WriteConcern(w=1) + self._master_col_name(etype), + write_concern=WriteConcern(w=1), + codec_options=self._codec_opts, ) begin = time.time() with lock: From 5bd2dc560689f45c58f3917f30de2aeceef4a7b9 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 17:45:38 +0200 Subject: [PATCH 15/17] Database: Fixed logging. --- dp3/database/database.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index f3e0b4bd..4a0ca07d 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -1227,11 +1227,12 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): # Insert new snapshots res = self._db[snapshot_col].insert_many(inserts, ordered=False) new_normal.update(res.inserted_ids) - if len(res.inserted_ids) != len(snapshots_by_eid): + if len(res.inserted_ids) != len(inserts): self.log.warning( - "Some snapshots were not inserted, %s != %s", + "Some snapshots were not inserted, %s != %s, failed: %s", len(res.inserted_ids), len(snapshots_by_eid), + {s["_id"] for s in inserts} - set(res.inserted_ids), ) except (DocumentTooLarge, OperationFailure) as e: self.log.info(f"Inserted snapshot is too large, will retry with oversize. {e}") From 13fce0059db218969a231506017f47971ea79912 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 17:47:57 +0200 Subject: [PATCH 16/17] API Tests: Updated to match changes. Added pauses for batched writes to happen. --- tests/test_api/test_get_attr_value.py | 7 +++++++ tests/test_api/test_get_distinct_attr_values.py | 3 ++- tests/test_api/test_get_entity_eids.py | 7 ++++--- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/test_api/test_get_attr_value.py b/tests/test_api/test_get_attr_value.py index 2dd8ba49..971af5d8 100644 --- a/tests/test_api/test_get_attr_value.py +++ b/tests/test_api/test_get_attr_value.py @@ -1,3 +1,5 @@ +from time import sleep + import common from common import ACCEPTED_ERROR_CODES @@ -13,6 +15,11 @@ class GetEidAttrValue(common.APITest): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + sleep(10) + def test_unknown_entity_type(self): response = self.get_request( TESTED_PATH.format(entity="xyz", eid="test_entity_id", attr="test_attr_int") diff --git a/tests/test_api/test_get_distinct_attr_values.py b/tests/test_api/test_get_distinct_attr_values.py index 6e0cee87..916134de 100644 --- a/tests/test_api/test_get_distinct_attr_values.py +++ b/tests/test_api/test_get_distinct_attr_values.py @@ -34,10 +34,11 @@ def setUpClass(cls) -> None: ] ) print(res.content.decode("utf-8"), file=sys.stderr) + sleep(9) # Make snapshots cls.get_request("control/make_snapshots") - sleep(3) + sleep(6) def test_unknown_entity_type(self): response = self.get_request(TESTED_PATH.format(entity="xyz", attr="test_attr_int")) diff --git a/tests/test_api/test_get_entity_eids.py b/tests/test_api/test_get_entity_eids.py index e4d70483..a1cadf0e 100644 --- a/tests/test_api/test_get_entity_eids.py +++ b/tests/test_api/test_get_entity_eids.py @@ -17,8 +17,9 @@ def setUpClass(cls) -> None: [{**dp_base, "id": f"A{i}", "v": f"v{i}"} for i in range(i, i + 20)] ) print(res.content.decode("utf-8"), file=sys.stderr) + sleep(8) cls.get_request("control/make_snapshots") - sleep(3) + sleep(6) def test_get_entity_eids(self): eids = self.get_entity_data("entity/A", EntityEidList) @@ -30,7 +31,7 @@ def test_get_entity_eids_pagination(self): for i in range(0, 100, 10): eids = self.get_entity_data("entity/A", EntityEidList, skip=i, limit=10) - self.assertEqual(10, len(eids.data)) + self.assertEqual(10, len(eids.data), f"Failed at {i}") received_eids.update(x["eid"] for x in eids.data) eids = self.get_entity_data("entity/A", EntityEidList, skip=101, limit=20) @@ -39,7 +40,7 @@ def test_get_entity_eids_pagination(self): def test_get_entity_eids_generic_filter(self): eids = self.get_entity_data( - "entity/A", EntityEidList, generic_filter=json.dumps({"eid": "A0"}) + "entity/A", EntityEidList, generic_filter=json.dumps({"_id": "A0"}) ) self.assertEqual(1, len(eids.data)) self.assertEqual("A0", eids.data[0]["eid"]) From cec58e97a1d20d9c4d58c59e8ccbd1fe1a425808 Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 1 Aug 2024 17:48:42 +0200 Subject: [PATCH 17/17] API Tests: Added pause for stack startup. To reduce test flakiness. --- .github/workflows/tests.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4e39c802..083132da 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,12 +15,18 @@ jobs: - name: Setup running platform stack run: docker compose up --build -d + - name: Pause for stack to start + run: sleep 10 + - name: Integration tests - API run: docker run --env CONF_DIR=/dp3/tests/test_config --network container:dp3_api dp3_interpreter python -m unittest discover -s tests/test_api -v + - name: Check worker errors + run: docker compose logs worker | grep "WARNING\|ERROR\|exception" | grep -v "RabbitMQ\|it's\ OK\ now,\ we're\ successfully\ connected" || true + - name: Teardown platform stack run: docker compose down