From 285a4bc2d02e52861defbc9063bc8a7e48649646 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Mon, 4 Aug 2025 17:59:03 -0700 Subject: [PATCH 1/8] [client] Introduce security assessment (opencti/#11707) --- pycti/api/opencti_api_client.py | 2 + pycti/connector/opencti_connector.py | 6 + pycti/connector/opencti_connector_helper.py | 54 ++++-- pycti/entities/opencti_security_assessment.py | 179 ++++++++++++++++++ pycti/utils/opencti_stix2.py | 1 + 5 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 pycti/entities/opencti_security_assessment.py diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 1e6b4784b..6760f4775 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -52,6 +52,7 @@ from pycti.entities.opencti_opinion import Opinion from pycti.entities.opencti_report import Report from pycti.entities.opencti_role import Role +from pycti.entities.opencti_security_assessment import SecurityAssessment from pycti.entities.opencti_settings import Settings from pycti.entities.opencti_stix import Stix from pycti.entities.opencti_stix_core_object import StixCoreObject @@ -223,6 +224,7 @@ def __init__( self.narrative = Narrative(self) self.language = Language(self) self.vulnerability = Vulnerability(self) + self.security_assessment = SecurityAssessment(self) self.attack_pattern = AttackPattern(self) self.course_of_action = CourseOfAction(self) self.data_component = DataComponent(self) diff --git a/pycti/connector/opencti_connector.py b/pycti/connector/opencti_connector.py index 1601dcb22..e25b4eebf 100644 --- a/pycti/connector/opencti_connector.py +++ b/pycti/connector/opencti_connector.py @@ -43,6 +43,8 @@ def __init__( auto: bool, only_contextual: bool, playbook_compatible: bool, + auto_update: bool, + enrichment_resolution: str, listen_callback_uri=None, ): self.id = connector_id @@ -55,6 +57,8 @@ def __init__( else: self.scope = [] self.auto = auto + self.auto_update = auto_update + self.enrichment_resolution = enrichment_resolution self.only_contextual = only_contextual self.playbook_compatible = playbook_compatible self.listen_callback_uri = listen_callback_uri @@ -72,6 +76,8 @@ def to_input(self) -> dict: "type": self.type.name, "scope": self.scope, "auto": self.auto, + "auto_update": self.auto_update, + "enrichment_resolution": self.enrichment_resolution, "only_contextual": self.only_contextual, "playbook_compatible": self.playbook_compatible, "listen_callback_uri": self.listen_callback_uri, diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index 6d843729f..9d139c8e6 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -365,6 +365,16 @@ def _data_handler(self, json_data) -> None: event_data = json_data["event"] entity_id = event_data.get("entity_id") entity_type = event_data.get("entity_type") + stix_entity = ( + json.loads(event_data.get("stix_entity")) + if event_data.get("stix_entity") + else None + ) + stix_objects = ( + json.loads(event_data.get("stix_objects")) + if event_data.get("stix_objects") + else None + ) validation_mode = event_data.get("validation_mode", "workbench") force_validation = event_data.get("force_validation", False) # Set the API headers @@ -430,15 +440,16 @@ def _data_handler(self, json_data) -> None: else: # If not playbook but enrichment, compute object on enrichment_entity opencti_entity = event_data["enrichment_entity"] - stix_objects = self.helper.api.stix2.prepare_export( - entity=self.helper.api.stix2.generate_export( - copy.copy(opencti_entity) + if stix_objects is None: + stix_objects = self.helper.api.stix2.prepare_export( + entity=self.helper.api.stix2.generate_export( + copy.copy(opencti_entity) + ) ) - ) - stix_entity = [ - e - for e in stix_objects - if e["id"] == opencti_entity["standard_id"] + stix_entity = [ + e + for e in stix_objects + if e["id"] == opencti_entity["standard_id"] or e["id"] == "x-opencti-" + opencti_entity["standard_id"] ][0] event_data["stix_objects"] = stix_objects @@ -1116,6 +1127,15 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None: self.connect_auto = get_config_variable( "CONNECTOR_AUTO", ["connector", "auto"], config, default=False ) + self.connect_auto_update = get_config_variable( + "CONNECTOR_AUTO_UPDATE", ["connector", "auto_update"], config, default=False + ) + self.connect_enrichment_resolution = get_config_variable( + "CONNECTOR_ENRICHMENT_RESOLUTION", + ["connector", "enrichment_resolution"], + config, + default="none", + ) self.bundle_send_to_queue = get_config_variable( "CONNECTOR_SEND_TO_QUEUE", ["connector", "send_to_queue"], @@ -1231,14 +1251,16 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None: ) # Register the connector in OpenCTI self.connector = OpenCTIConnector( - self.connect_id, - self.connect_name, - self.connect_type, - self.connect_scope, - self.connect_auto, - self.connect_only_contextual, - playbook_compatible, - ( + connector_id=self.connect_id, + connector_name=self.connect_name, + connector_type=self.connect_type, + scope=self.connect_scope, + auto=self.connect_auto, + only_contextual=self.connect_only_contextual, + playbook_compatible=playbook_compatible, + auto_update=self.connect_auto_update, + enrichment_resolution=self.connect_enrichment_resolution, + listen_callback_uri=( self.listen_protocol_api_uri + self.listen_protocol_api_path if self.listen_protocol == "API" else None diff --git a/pycti/entities/opencti_security_assessment.py b/pycti/entities/opencti_security_assessment.py new file mode 100644 index 000000000..77a418cc1 --- /dev/null +++ b/pycti/entities/opencti_security_assessment.py @@ -0,0 +1,179 @@ +# coding: utf-8 + +import json +import uuid + +from stix2.canonicalization.Canonicalize import canonicalize + + +class SecurityAssessment: + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + entity_type + parent_types + spec_version + created_at + updated_at + objectAssess { + id + } + objectMarking { + id + standard_id + entity_type + definition_type + definition + created + modified + x_opencti_order + x_opencti_color + } + """ + + @staticmethod + def generate_id(name): + name = name.lower().strip() + data = {"name": name} + data = canonicalize(data, utf8=False) + id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) + return "securityAssessment--" + id + + @staticmethod + def generate_id_from_data(data): + return SecurityAssessment.generate_id(data["name"]) + + """ + List SecurityAssessment objects + + :param filters: the filters to apply + :param search: the search keyword + :param first: return the first n rows from the after ID (or the beginning if not set) + :param after: ID of the first row for pagination + :return List of SecurityAssessment objects + """ + + def list(self, **kwargs): + filters = kwargs.get("filters", None) + search = kwargs.get("search", None) + first = kwargs.get("first", 100) + after = kwargs.get("after", None) + order_by = kwargs.get("orderBy", None) + order_mode = kwargs.get("orderMode", None) + custom_attributes = kwargs.get("customAttributes", None) + get_all = kwargs.get("getAll", False) + with_pagination = kwargs.get("withPagination", False) + + self.opencti.app_logger.info( + "Listing SecurityAssessment with filters", {"filters": json.dumps(filters)} + ) + query = ( + """ + query SecurityAssessment($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityAssessmentOrdering, $orderMode: OrderingMode) { + securityAssessments(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { + edges { + node { + """ + + (custom_attributes if custom_attributes is not None else self.properties) + + """ + } + } + pageInfo { + startCursor + endCursor + hasNextPage + hasPreviousPage + globalCount + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "filters": filters, + "search": search, + "first": first, + "after": after, + "orderBy": order_by, + "orderMode": order_mode, + }, + ) + + if get_all: + final_data = [] + data = self.opencti.process_multiple(result["data"]["securityAssessments"]) + final_data = final_data + data + while result["data"]["securityAssessments"]["pageInfo"]["hasNextPage"]: + after = result["data"]["securityAssessments"]["pageInfo"]["endCursor"] + self.opencti.app_logger.info( + "Listing SecurityAssessment", {"after": after} + ) + result = self.opencti.query( + query, + { + "filters": filters, + "search": search, + "first": first, + "after": after, + "orderBy": order_by, + "orderMode": order_mode, + }, + ) + data = self.opencti.process_multiple( + result["data"]["securityAssessments"] + ) + final_data = final_data + data + return final_data + else: + return self.opencti.process_multiple( + result["data"]["securityAssessments"], with_pagination + ) + + """ + Read a SecurityAssessment object + + :param id: the id of the SecurityAssessment + :param filters: the filters to apply if no id provided + :return SecurityAssessment object + """ + + def read(self, **kwargs): + id = kwargs.get("id", None) + filters = kwargs.get("filters", None) + custom_attributes = kwargs.get("customAttributes", None) + if id is not None: + self.opencti.app_logger.info("Reading SecurityAssessment", {"id": id}) + query = ( + """ + query SecurityAssessment($id: String!) { + securityAssessment(id: $id) { + """ + + ( + custom_attributes + if custom_attributes is not None + else self.properties + ) + + """ + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields( + result["data"]["securityAssessment"] + ) + elif filters is not None: + result = self.list(filters=filters) + if len(result) > 0: + return result[0] + else: + return None + else: + self.opencti.app_logger.error( + "[opencti_tool] Missing parameters: id or filters" + ) + return None diff --git a/pycti/utils/opencti_stix2.py b/pycti/utils/opencti_stix2.py index e25ab57aa..a89d0706c 100644 --- a/pycti/utils/opencti_stix2.py +++ b/pycti/utils/opencti_stix2.py @@ -885,6 +885,7 @@ def get_readers(self): "Tool": self.opencti.tool.read, "Vocabulary": self.opencti.vocabulary.read, "Vulnerability": self.opencti.vulnerability.read, + "SecurityAssessment": self.opencti.security_assessment.read, } def get_reader(self, entity_type: str): From 52b81eb895c88a6a30c4eefd51fdd52c60db0388 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Wed, 8 Oct 2025 15:58:07 +0200 Subject: [PATCH 2/8] [client] Start renaming coverage (#opencti/11707) --- pycti/api/opencti_api_client.py | 4 +- ...ssment.py => opencti_security_coverage.py} | 56 ++++++++++--------- pycti/utils/opencti_stix2.py | 3 +- 3 files changed, 33 insertions(+), 30 deletions(-) rename pycti/entities/{opencti_security_assessment.py => opencti_security_coverage.py} (72%) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 6760f4775..5729f6d0e 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -52,7 +52,7 @@ from pycti.entities.opencti_opinion import Opinion from pycti.entities.opencti_report import Report from pycti.entities.opencti_role import Role -from pycti.entities.opencti_security_assessment import SecurityAssessment +from pycti.entities.opencti_security_coverage import SecurityCoverage from pycti.entities.opencti_settings import Settings from pycti.entities.opencti_stix import Stix from pycti.entities.opencti_stix_core_object import StixCoreObject @@ -224,7 +224,7 @@ def __init__( self.narrative = Narrative(self) self.language = Language(self) self.vulnerability = Vulnerability(self) - self.security_assessment = SecurityAssessment(self) + self.security_coverage = SecurityCoverage(self) self.attack_pattern = AttackPattern(self) self.course_of_action = CourseOfAction(self) self.data_component = DataComponent(self) diff --git a/pycti/entities/opencti_security_assessment.py b/pycti/entities/opencti_security_coverage.py similarity index 72% rename from pycti/entities/opencti_security_assessment.py rename to pycti/entities/opencti_security_coverage.py index 77a418cc1..7dacf38c5 100644 --- a/pycti/entities/opencti_security_assessment.py +++ b/pycti/entities/opencti_security_coverage.py @@ -6,7 +6,7 @@ from stix2.canonicalization.Canonicalize import canonicalize -class SecurityAssessment: +class SecurityCoverage: def __init__(self, opencti): self.opencti = opencti self.properties = """ @@ -17,8 +17,11 @@ def __init__(self, opencti): spec_version created_at updated_at - objectAssess { - id + objectCovered { + __typename + ... on StixCoreObject { + id + } } objectMarking { id @@ -34,25 +37,24 @@ def __init__(self, opencti): """ @staticmethod - def generate_id(name): - name = name.lower().strip() - data = {"name": name} + def generate_id(covered_ref): + data = {"covered_ref": covered_ref.lower().strip()} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) - return "securityAssessment--" + id + return "security-coverage--" + id @staticmethod def generate_id_from_data(data): - return SecurityAssessment.generate_id(data["name"]) + return SecurityCoverage.generate_id(data["covered_ref"]) """ - List SecurityAssessment objects + List securityCoverage objects :param filters: the filters to apply :param search: the search keyword :param first: return the first n rows from the after ID (or the beginning if not set) :param after: ID of the first row for pagination - :return List of SecurityAssessment objects + :return List of SecurityCoverage objects """ def list(self, **kwargs): @@ -67,12 +69,12 @@ def list(self, **kwargs): with_pagination = kwargs.get("withPagination", False) self.opencti.app_logger.info( - "Listing SecurityAssessment with filters", {"filters": json.dumps(filters)} + "Listing SecurityCoverage with filters", {"filters": json.dumps(filters)} ) query = ( """ - query SecurityAssessment($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityAssessmentOrdering, $orderMode: OrderingMode) { - securityAssessments(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { + query SecurityCoverage($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityCoverageOrdering, $orderMode: OrderingMode) { + securityCoverages(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ @@ -105,12 +107,12 @@ def list(self, **kwargs): if get_all: final_data = [] - data = self.opencti.process_multiple(result["data"]["securityAssessments"]) + data = self.opencti.process_multiple(result["data"]["securityCoverages"]) final_data = final_data + data - while result["data"]["securityAssessments"]["pageInfo"]["hasNextPage"]: - after = result["data"]["securityAssessments"]["pageInfo"]["endCursor"] + while result["data"]["securityCoverages"]["pageInfo"]["hasNextPage"]: + after = result["data"]["securityCoverages"]["pageInfo"]["endCursor"] self.opencti.app_logger.info( - "Listing SecurityAssessment", {"after": after} + "Listing SecurityCoverage", {"after": after} ) result = self.opencti.query( query, @@ -124,21 +126,21 @@ def list(self, **kwargs): }, ) data = self.opencti.process_multiple( - result["data"]["securityAssessments"] + result["data"]["securityCoverages"] ) final_data = final_data + data return final_data else: return self.opencti.process_multiple( - result["data"]["securityAssessments"], with_pagination + result["data"]["securityCoverages"], with_pagination ) """ - Read a SecurityAssessment object + Read a SecurityCoverage object - :param id: the id of the SecurityAssessment + :param id: the id of the SecurityCoverage :param filters: the filters to apply if no id provided - :return SecurityAssessment object + :return SecurityCoverage object """ def read(self, **kwargs): @@ -146,11 +148,11 @@ def read(self, **kwargs): filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) if id is not None: - self.opencti.app_logger.info("Reading SecurityAssessment", {"id": id}) + self.opencti.app_logger.info("Reading SecurityCoverage", {"id": id}) query = ( """ - query SecurityAssessment($id: String!) { - securityAssessment(id: $id) { + query SecurityCoverage($id: String!) { + securityCoverage(id: $id) { """ + ( custom_attributes @@ -164,7 +166,7 @@ def read(self, **kwargs): ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( - result["data"]["securityAssessment"] + result["data"]["securityCoverage"] ) elif filters is not None: result = self.list(filters=filters) @@ -174,6 +176,6 @@ def read(self, **kwargs): return None else: self.opencti.app_logger.error( - "[opencti_tool] Missing parameters: id or filters" + "[opencti_security_coverage] Missing parameters: id or filters" ) return None diff --git a/pycti/utils/opencti_stix2.py b/pycti/utils/opencti_stix2.py index a89d0706c..f25fb480a 100644 --- a/pycti/utils/opencti_stix2.py +++ b/pycti/utils/opencti_stix2.py @@ -885,7 +885,7 @@ def get_readers(self): "Tool": self.opencti.tool.read, "Vocabulary": self.opencti.vocabulary.read, "Vulnerability": self.opencti.vulnerability.read, - "SecurityAssessment": self.opencti.security_assessment.read, + "Security-Coverage": self.opencti.security_coverage.read, } def get_reader(self, entity_type: str): @@ -962,6 +962,7 @@ def get_stix_helper(self): "narrative": self.opencti.narrative, "task": self.opencti.task, "x-opencti-task": self.opencti.task, + "Security-Coverage": self.opencti.security_coverage, "vocabulary": self.opencti.vocabulary, # relationships "relationship": self.opencti.stix_core_relationship, From dc5249e6eb39398c442c981e6e5f3ac87f1d8625 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Fri, 10 Oct 2025 01:55:08 +0200 Subject: [PATCH 3/8] [backend/frontend/worker] Start cleanup + worker (#11707) --- pycti/connector/opencti_connector_helper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index 9d139c8e6..9b6808104 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -450,8 +450,8 @@ def _data_handler(self, json_data) -> None: e for e in stix_objects if e["id"] == opencti_entity["standard_id"] - or e["id"] == "x-opencti-" + opencti_entity["standard_id"] - ][0] + or e["id"] == "x-opencti-" + opencti_entity["standard_id"] + ][0] event_data["stix_objects"] = stix_objects event_data["stix_entity"] = stix_entity # Handle organization propagation From 32cb37140e852b78173d680a1a7c1230b2ce02c3 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Sun, 12 Oct 2025 15:16:07 +0200 Subject: [PATCH 4/8] [client] Improve security coverage handling (opencti/#11707) --- pycti/entities/opencti_security_coverage.py | 124 ++++++++++++++++++++ pycti/utils/opencti_stix2.py | 2 +- pycti/utils/opencti_stix2_utils.py | 1 + 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/pycti/entities/opencti_security_coverage.py b/pycti/entities/opencti_security_coverage.py index 7dacf38c5..2c09d1776 100644 --- a/pycti/entities/opencti_security_coverage.py +++ b/pycti/entities/opencti_security_coverage.py @@ -179,3 +179,127 @@ def read(self, **kwargs): "[opencti_security_coverage] Missing parameters: id or filters" ) return None + + """ + Create a Security coverage object + + :return Security Coverage object + """ + + def create(self, **kwargs): + stix_id = kwargs.get("stix_id", None) + name = kwargs.get("name", None) + description = kwargs.get("description", None) + created_by = kwargs.get("createdBy", None) + object_marking = kwargs.get("objectMarking", None) + object_label = kwargs.get("objectLabel", None) + object_covered = kwargs.get("objectCovered", None) + external_references = kwargs.get("externalReferences", None) + coverage_last_result = kwargs.get("coverage_last_result", None) + coverage_valid_from = kwargs.get("coverage_valid_from", None) + coverage_valid_to = kwargs.get("coverage_valid_to", None) + coverage_information = kwargs.get("coverage_information", None) + + if name is not None and object_covered is not None: + self.opencti.app_logger.info("Creating Security Coverage", {"name": name}) + query = """ + mutation SecurityCoverageAdd($input: SecurityCoverageAddInput!) { + securityCoverageAdd(input: $input) { + id + standard_id + entity_type + parent_types + } + } + """ + result = self.opencti.query( + query, + { + "input": { + "stix_id": stix_id, + "name": name, + "description": description, + "createdBy": created_by, + "objectMarking": object_marking, + "objectLabel": object_label, + "objectCovered": object_covered, + "externalReferences": external_references, + "coverage_last_result": coverage_last_result, + "coverage_valid_from": coverage_valid_from, + "coverage_valid_to": coverage_valid_to, + "coverage_information": coverage_information, + } + }, + ) + return self.opencti.process_multiple_fields(result["data"]["securityCoverageAdd"]) + else: + self.opencti.app_logger.error( + "[opencti_security_coverage] " + "Missing parameters: name or object_covered" + ) + + """ + Import a Security coverage from a STIX2 object + + :param stixObject: the Stix-Object Security coverage + :return Security coverage object + """ + + def import_from_stix2(self, **kwargs): + stix_object = kwargs.get("stixObject", None) + extras = kwargs.get("extras", {}) + if stix_object is not None: + # Search in extensions + if "x_opencti_stix_ids" not in stix_object: + stix_object["x_opencti_stix_ids"] = ( + self.opencti.get_attribute_in_extension("stix_ids", stix_object) + ) + if "x_opencti_granted_refs" not in stix_object: + stix_object["x_opencti_granted_refs"] = ( + self.opencti.get_attribute_in_extension("granted_refs", stix_object) + ) + + raw_coverages = stix_object["coverage"] if "coverage" in stix_object else [] + coverage_information = list(map(lambda cov: {"coverage_name":cov["name"],"coverage_score":cov["score"]}, raw_coverages)) + + return self.create( + stix_id=stix_object["id"], + name=stix_object["name"], + coverage_last_result=stix_object["last_result"] if "last_result" in stix_object else None, + coverage_valid_from=stix_object["valid_from"] if "valid_from" in stix_object else None, + coverage_valid_to=stix_object["valid_to"] if "valid_to" in stix_object else None, + coverage_information=coverage_information, + description=( + self.opencti.stix2.convert_markdown(stix_object["description"]) + if "description" in stix_object + else None + ), + createdBy=( + extras["created_by_id"] if "created_by_id" in extras else None + ), + objectMarking=( + extras["object_marking_ids"] + if "object_marking_ids" in extras + else None + ), + objectLabel=( + extras["object_label_ids"] if "object_label_ids" in extras else None + ), + objectCovered=( + stix_object["covered_ref"] if "covered_ref" in stix_object else None + ), + externalReferences=( + extras["external_references_ids"] + if "external_references_ids" in extras + else None + ), + x_opencti_stix_ids=( + stix_object["x_opencti_stix_ids"] + if "x_opencti_stix_ids" in stix_object + else None + ), + ) + else: + self.opencti.app_logger.error( + "[opencti_security_coverage] Missing parameters: stixObject" + ) \ No newline at end of file diff --git a/pycti/utils/opencti_stix2.py b/pycti/utils/opencti_stix2.py index f25fb480a..79c927e7d 100644 --- a/pycti/utils/opencti_stix2.py +++ b/pycti/utils/opencti_stix2.py @@ -962,7 +962,7 @@ def get_stix_helper(self): "narrative": self.opencti.narrative, "task": self.opencti.task, "x-opencti-task": self.opencti.task, - "Security-Coverage": self.opencti.security_coverage, + "security-coverage": self.opencti.security_coverage, "vocabulary": self.opencti.vocabulary, # relationships "relationship": self.opencti.stix_core_relationship, diff --git a/pycti/utils/opencti_stix2_utils.py b/pycti/utils/opencti_stix2_utils.py index 8b8d8100e..1f7d7a876 100644 --- a/pycti/utils/opencti_stix2_utils.py +++ b/pycti/utils/opencti_stix2_utils.py @@ -64,6 +64,7 @@ "threat-actor", "tool", "vulnerability", + "security-coverage", ] SUPPORTED_STIX_ENTITY_OBJECTS = STIX_META_OBJECTS + STIX_CORE_OBJECTS From 2283cdd230c97b59a91107c9ddb0d12644733ed6 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Mon, 13 Oct 2025 02:14:02 +0200 Subject: [PATCH 5/8] [client] Reformat (opencti/#11707) --- pycti/entities/opencti_security_coverage.py | 28 ++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pycti/entities/opencti_security_coverage.py b/pycti/entities/opencti_security_coverage.py index 2c09d1776..c0b11b35f 100644 --- a/pycti/entities/opencti_security_coverage.py +++ b/pycti/entities/opencti_security_coverage.py @@ -231,7 +231,9 @@ def create(self, **kwargs): } }, ) - return self.opencti.process_multiple_fields(result["data"]["securityCoverageAdd"]) + return self.opencti.process_multiple_fields( + result["data"]["securityCoverageAdd"] + ) else: self.opencti.app_logger.error( "[opencti_security_coverage] " @@ -260,14 +262,28 @@ def import_from_stix2(self, **kwargs): ) raw_coverages = stix_object["coverage"] if "coverage" in stix_object else [] - coverage_information = list(map(lambda cov: {"coverage_name":cov["name"],"coverage_score":cov["score"]}, raw_coverages)) + coverage_information = list( + map( + lambda cov: { + "coverage_name": cov["name"], + "coverage_score": cov["score"], + }, + raw_coverages, + ) + ) return self.create( stix_id=stix_object["id"], name=stix_object["name"], - coverage_last_result=stix_object["last_result"] if "last_result" in stix_object else None, - coverage_valid_from=stix_object["valid_from"] if "valid_from" in stix_object else None, - coverage_valid_to=stix_object["valid_to"] if "valid_to" in stix_object else None, + coverage_last_result=( + stix_object["last_result"] if "last_result" in stix_object else None + ), + coverage_valid_from=( + stix_object["valid_from"] if "valid_from" in stix_object else None + ), + coverage_valid_to=( + stix_object["valid_to"] if "valid_to" in stix_object else None + ), coverage_information=coverage_information, description=( self.opencti.stix2.convert_markdown(stix_object["description"]) @@ -302,4 +318,4 @@ def import_from_stix2(self, **kwargs): else: self.opencti.app_logger.error( "[opencti_security_coverage] Missing parameters: stixObject" - ) \ No newline at end of file + ) From 2af8480242216af904c3123734488bb3cb0b1d85 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Thu, 23 Oct 2025 00:28:17 +0200 Subject: [PATCH 6/8] [backend] Adapt UI, support relationships (#11707) --- pycti/entities/opencti_stix_core_relationship.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pycti/entities/opencti_stix_core_relationship.py b/pycti/entities/opencti_stix_core_relationship.py index dcd926733..c76837365 100644 --- a/pycti/entities/opencti_stix_core_relationship.py +++ b/pycti/entities/opencti_stix_core_relationship.py @@ -624,6 +624,7 @@ def create(self, **kwargs): granted_refs = kwargs.get("objectOrganization", None) x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) + coverage_information = kwargs.get("coverage_information", None) update = kwargs.get("update", False) self.opencti.app_logger.info( @@ -668,6 +669,7 @@ def create(self, **kwargs): "killChainPhases": kill_chain_phases, "x_opencti_workflow_id": x_opencti_workflow_id, "x_opencti_stix_ids": x_opencti_stix_ids, + "coverage_information": coverage_information, "update": update, } }, @@ -1175,6 +1177,19 @@ def import_from_stix2(self, **kwargs): ) ) + raw_coverages = ( + stix_relation["coverage"] if "coverage" in stix_relation else [] + ) + coverage_information = list( + map( + lambda cov: { + "coverage_name": cov["name"], + "coverage_score": cov["score"], + }, + raw_coverages, + ) + ) + source_ref = stix_relation["source_ref"] target_ref = stix_relation["target_ref"] return self.create( @@ -1197,6 +1212,7 @@ def import_from_stix2(self, **kwargs): if "stop_time" in stix_relation else default_date ), + coverage_information=coverage_information, revoked=( stix_relation["revoked"] if "revoked" in stix_relation else None ), From 6b4d8d86aad018ad01f5a08f0794a595c9077094 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Thu, 23 Oct 2025 16:14:24 +0200 Subject: [PATCH 7/8] [client] Improve security coverage api --- pycti/entities/opencti_security_coverage.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pycti/entities/opencti_security_coverage.py b/pycti/entities/opencti_security_coverage.py index c0b11b35f..e595a9c3b 100644 --- a/pycti/entities/opencti_security_coverage.py +++ b/pycti/entities/opencti_security_coverage.py @@ -17,6 +17,7 @@ def __init__(self, opencti): spec_version created_at updated_at + external_uri objectCovered { __typename ... on StixCoreObject { @@ -195,10 +196,12 @@ def create(self, **kwargs): object_label = kwargs.get("objectLabel", None) object_covered = kwargs.get("objectCovered", None) external_references = kwargs.get("externalReferences", None) + external_uri = kwargs.get("external_uri", None) coverage_last_result = kwargs.get("coverage_last_result", None) coverage_valid_from = kwargs.get("coverage_valid_from", None) coverage_valid_to = kwargs.get("coverage_valid_to", None) coverage_information = kwargs.get("coverage_information", None) + auto_enrichment_disable = kwargs.get("auto_enrichment_disable", None) if name is not None and object_covered is not None: self.opencti.app_logger.info("Creating Security Coverage", {"name": name}) @@ -223,11 +226,13 @@ def create(self, **kwargs): "objectMarking": object_marking, "objectLabel": object_label, "objectCovered": object_covered, + "external_uri": external_uri, "externalReferences": external_references, "coverage_last_result": coverage_last_result, "coverage_valid_from": coverage_valid_from, "coverage_valid_to": coverage_valid_to, "coverage_information": coverage_information, + "auto_enrichment_disable": auto_enrichment_disable, } }, ) @@ -275,6 +280,16 @@ def import_from_stix2(self, **kwargs): return self.create( stix_id=stix_object["id"], name=stix_object["name"], + external_uri=( + stix_object["external_uri"] + if "external_uri" in stix_object + else None + ), + auto_enrichment_disable=( + stix_object["auto_enrichment_disable"] + if "auto_enrichment_disable" in stix_object + else False + ), coverage_last_result=( stix_object["last_result"] if "last_result" in stix_object else None ), From d8c6996b19e2e967de5d3d7ea7563a471ec2e0ba Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Tue, 28 Oct 2025 21:21:32 +0100 Subject: [PATCH 8/8] [client] Remove settings attributes --- pycti/entities/opencti_settings.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pycti/entities/opencti_settings.py b/pycti/entities/opencti_settings.py index e0d950fda..b2aadf126 100644 --- a/pycti/entities/opencti_settings.py +++ b/pycti/entities/opencti_settings.py @@ -60,11 +60,9 @@ def __init__(self, opencti): platform_theme_light_logo_login platform_map_tile_server_dark platform_map_tile_server_light - platform_openbas_url - platform_openbas_disable_display - platform_openerm_url - platform_openmtd_url platform_ai_enabled + platform_openaev_url + platform_opengrc_url platform_ai_type platform_ai_model platform_ai_has_token