Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.
Merged
8 changes: 8 additions & 0 deletions pycti/api/opencti_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@

from pycti import __version__
from pycti.api.opencti_api_connector import OpenCTIApiConnector
from pycti.api.opencti_api_draft import OpenCTIApiDraft
from pycti.api.opencti_api_pir import OpenCTIApiPir
from pycti.api.opencti_api_playbook import OpenCTIApiPlaybook
from pycti.api.opencti_api_public_dashboard import OpenCTIApiPublicDashboard
from pycti.api.opencti_api_trash import OpenCTIApiTrash
from pycti.api.opencti_api_work import OpenCTIApiWork
from pycti.api.opencti_api_workspace import OpenCTIApiWorkspace
from pycti.entities.opencti_attack_pattern import AttackPattern
from pycti.entities.opencti_campaign import Campaign
from pycti.entities.opencti_capability import Capability
Expand Down Expand Up @@ -168,6 +172,10 @@ def __init__(
self.session = requests.session()
# Define the dependencies
self.work = OpenCTIApiWork(self)
self.trash = OpenCTIApiTrash(self)
self.draft = OpenCTIApiDraft(self)
self.workspace = OpenCTIApiWorkspace(self)
self.public_dashboard = OpenCTIApiPublicDashboard(self)
self.playbook = OpenCTIApiPlaybook(self)
self.connector = OpenCTIApiConnector(self)
self.stix2 = OpenCTIStix2(self)
Expand Down
19 changes: 19 additions & 0 deletions pycti/api/opencti_api_draft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class OpenCTIApiDraft:
"""OpenCTIApiDraft"""

def __init__(self, api):
self.api = api

def delete(self, **kwargs):
id = kwargs.get("id", None)
query = """
mutation DraftWorkspaceDelete($id: ID!) {
draftWorkspaceDelete(id: $id)
}
"""
self.api.query(
query,
{
"id": id,
},
)
20 changes: 20 additions & 0 deletions pycti/api/opencti_api_playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,23 @@ def playbook_step_execution(self, playbook: dict, bundle: str):
"bundle": bundle,
},
)

def delete(self, **kwargs):
id = kwargs.get("id", None)
if id is not None:
query = """
mutation PlaybookDelete($id: ID!) {
playbookDelete(id: $id)
}
"""
self.api.query(
query,
{
"id": id,
},
)
else:
self.opencti.app_logger.error(
"[stix_playbook] Cant delete playbook, missing parameters: id"
)
return None
25 changes: 25 additions & 0 deletions pycti/api/opencti_api_public_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class OpenCTIApiPublicDashboard:
"""OpenCTIApiPublicDashboard"""

def __init__(self, api):
self.api = api

def delete(self, **kwargs):
id = kwargs.get("id", None)
if id is not None:
query = """
mutation PublicDashboardDelete($id: ID!) {
publicDashboardDelete(id: $id)
}
"""
self.api.query(
query,
{
"id": id,
},
)
else:
self.opencti.app_logger.error(
"[stix_public_dashboard] Cant delete public dashboard, missing parameters: id"
)
return None
42 changes: 42 additions & 0 deletions pycti/api/opencti_api_trash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
class OpenCTIApiTrash:
"""OpenCTIApiTrash"""

def __init__(self, api):
self.api = api

def restore(self, operation_id: str):
query = """
mutation DeleteOperationRestore($id: ID!) {
deleteOperationRestore(id: $id)
}
"""
self.api.query(
query,
{
"id": operation_id,
},
)

def delete(self, **kwargs):
"""Delete a trash item given its ID

:param id: ID for the delete operation on the platform.
:type id: str
"""
id = kwargs.get("id", None)
if id is None:
self.api.admin_logger.error(
"[opencti_trash] Cant confirm delete, missing parameter: id"
)
return None
query = """
mutation DeleteOperationConfirm($id: ID!) {
deleteOperationConfirm(id: $id)
}
"""
self.api.query(
query,
{
"id": id,
},
)
19 changes: 19 additions & 0 deletions pycti/api/opencti_api_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ def delete_work(self, work_id: str):
work = self.api.query(query, {"workId": work_id}, True)
return work["data"]

def delete(self, **kwargs):
id = kwargs.get("id", None)
if id is None:
self.opencti.admin_logger.error(
"[opencti_work] Cant delete work, missing parameter: id"
)
return None
query = """
mutation ConnectorWorksMutation($workId: ID!) {
workEdit(id: $workId) {
delete
}
}"""
work = self.api.query(
query,
{"workId": id},
)
return work["data"]

def wait_for_work_to_finish(self, work_id: str):
status = ""
cnt = 0
Expand Down
24 changes: 24 additions & 0 deletions pycti/api/opencti_api_workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class OpenCTIApiWorkspace:
"""OpenCTIApiWorkspace"""

def __init__(self, api):
self.api = api

def delete(self, **kwargs):
id = kwargs.get("id", None)
if id is None:
self.api.admin_logger.error(
"[opencti_workspace] Cant delete workspace, missing parameter: id"
)
return None
query = """
mutation WorkspaceDelete($id: ID!) {
workspaceDelete(id: $id)
}
"""
self.api.query(
query,
{
"id": id,
},
)
8 changes: 7 additions & 1 deletion pycti/entities/opencti_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,18 @@ def create(self, **kwargs) -> Optional[Dict]:
)
return self.opencti.process_multiple_fields(result["data"]["groupAdd"])

def delete(self, id: str):
def delete(self, **kwargs):
"""Delete a given group from OpenCTI

:param id: ID of the group to delete.
:type id: str
"""
id = kwargs.get("id", None)
if id is None:
self.opencti.admin_logger.error(
"[opencti_group] Cant delete group, missing parameter: id"
)
return None
self.opencti.admin_logger.info("Deleting group", {"id": id})
query = """
mutation GroupDelete($id: ID!) {
Expand Down
37 changes: 37 additions & 0 deletions pycti/entities/opencti_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,43 @@ def create(self, **kwargs):
"name or pattern or pattern_type or x_opencti_main_observable_type"
)

"""
Update an Indicator object field

:param id: the Indicator id
:param input: the input of the field
"""

def update_field(self, **kwargs):
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating Indicator", {"id": id})
query = """
mutation IndicatorFieldPatch($id: ID!, $input: [EditInput!]!) {
indicatorFieldPatch(id: $id, input: $input) {
id
standard_id
entity_type
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["indicatorFieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_stix_domain_object] Cant update indicator field, missing parameters: id and input"
)
return None

def add_stix_cyber_observable(self, **kwargs):
"""
Add a Stix-Cyber-Observable object to Indicator object (based-on)
Expand Down
7 changes: 4 additions & 3 deletions pycti/entities/opencti_stix.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ def __init__(self, opencti):

def delete(self, **kwargs):
id = kwargs.get("id", None)
force_delete = kwargs.get("force_delete", True)
if id is not None:
self.opencti.app_logger.info("Deleting Stix element", {"id": id})
query = """
mutation StixEdit($id: ID!) {
mutation StixEdit($id: ID!, $forceDelete: Boolean) {
stixEdit(id: $id) {
delete
delete(forceDelete: $forceDelete)
}
}
"""
self.opencti.query(query, {"id": id})
self.opencti.query(query, {"id": id, "forceDelete": force_delete})
else:
self.opencti.app_logger.error("[opencti_stix] Missing parameters: id")
return None
Expand Down
Loading