From 568da400ee5378f564640a480e442d399d68353d Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 10 Jul 2025 14:18:00 +0530 Subject: [PATCH 1/7] add dry run option initial changes --- cognite/extractorutils/unstable/core/base.py | 44 ++++++++++++++++++- .../extractorutils/unstable/core/runtime.py | 18 +++++++- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index b3ecaa12..890999e9 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -52,7 +52,7 @@ def my_task_function(self, task_context: TaskContext) -> None: from multiprocessing import Queue from threading import RLock, Thread from types import TracebackType -from typing import Generic, Literal, TypeVar +from typing import Any, Generic, Literal, TypeVar from humps import pascalize from typing_extensions import Self, assert_never @@ -84,6 +84,36 @@ def my_task_function(self, task_context: TaskContext) -> None: _T = TypeVar("_T", bound=ExtractorConfig) +class _NoOpCogniteClient: + """A mock CogniteClient that performs no actions, for use in dry-run mode.""" + + class _MockResponse: + def __init__(self, url: str) -> None: + self._url = url + + def json(self) -> dict: + if "integrations/checkin" in self._url: + return {"lastConfigRevision": None} + return {} + + def __init__(self, config: ConnectionConfig, client_name: str) -> None: + class MockSDKConfig: + def __init__(self, project: str) -> None: + self.project = project + + self.config = MockSDKConfig(config.project) + self._logger = logging.getLogger(__name__) + self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}") + + def post(self, url: str, json: dict, **kwargs: dict[str, Any]) -> _MockResponse: + self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}") + return self._MockResponse(url) + + def get(self, url: str, **kwargs: dict[str, Any]) -> _MockResponse: + self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}") + return self._MockResponse(url) + + class FullConfig(Generic[_T]): """ A class that holds the full configuration for an extractor. @@ -98,11 +128,13 @@ def __init__( application_config: _T, current_config_revision: ConfigRevision, log_level_override: str | None = None, + is_dry_run: bool = False, ) -> None: self.connection_config = connection_config self.application_config = application_config self.current_config_revision = current_config_revision self.log_level_override = log_level_override + self.is_dry_run = is_dry_run class Extractor(Generic[ConfigType], CogniteLogger): @@ -124,8 +156,13 @@ class Extractor(Generic[ConfigType], CogniteLogger): CONFIG_TYPE: type[ConfigType] RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES + SUPPORTS_DRY_RUN: bool = False def __init__(self, config: FullConfig[ConfigType]) -> None: + self.is_dry_run = config.is_dry_run + if self.is_dry_run and not self.SUPPORTS_DRY_RUN: + raise NotImplementedError(f"Extractor '{self.NAME}' does not support dry-run mode.") + self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") self.cancellation_token = CancellationToken() @@ -136,7 +173,10 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self.current_config_revision = config.current_config_revision self.log_level_override = config.log_level_override - self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") + if self.is_dry_run: + self.cognite_client = _NoOpCogniteClient(self.connection_config, f"{self.EXTERNAL_ID}-{self.VERSION}") + else: + self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") self._checkin_lock = RLock() self._runtime_messages: Queue[RuntimeMessage] | None = None diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 02e410b3..4de862b6 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -134,6 +134,11 @@ def _create_argparser(self) -> ArgumentParser: required=False, help="Set the current working directory for the extractor.", ) + argparser.add_argument( + "--dry-run", + action="store_true", + help="Run without writing to CDF. The extractor must support this feature for this to work.", + ) return argparser @@ -167,6 +172,9 @@ def _inner_run( with extractor: extractor.run() + except NotImplementedError as e: + logging.getLogger(__name__).critical(f"Configuration error: {e}") + except Exception: self.logger.exception("Extractor crashed, will attempt restart") message_queue.put(RuntimeMessage.RESTART) @@ -232,6 +240,13 @@ def _safe_get_application_config( args: Namespace, connection_config: ConnectionConfig, ) -> tuple[ExtractorConfig, ConfigRevision] | None: + if args.dry_run and not args.force_local_config: + self.logger.warning( + "Running in dry-run mode without a local application config file (-f). " + "The extractor will not perform any actions." + ) + return None + prev_error: str | None = None while not self._cancellation_token.is_cancelled: @@ -339,7 +354,7 @@ def run(self) -> None: self.logger.critical("Could not load connection config") sys.exit(1) - if not args.skip_init_checks and not self._verify_connection_config(connection_config): + if not args.dry_run and not args.skip_init_checks and not self._verify_connection_config(connection_config): sys.exit(1) # This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't @@ -363,6 +378,7 @@ def run(self) -> None: application_config=application_config, current_config_revision=current_config_revision, log_level_override=args.log_level, + is_dry_run=args.dry_run, ) ) process.join() From 2dc960b6e785c78c824d73da1d8ee830e5278751 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Mon, 14 Jul 2025 11:35:05 +0530 Subject: [PATCH 2/7] Add testcases --- cognite/extractorutils/unstable/core/base.py | 2 + tests/test_unstable/test_runtime.py | 45 +++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 890999e9..184f07f5 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -57,6 +57,7 @@ def my_task_function(self, task_context: TaskContext) -> None: from humps import pascalize from typing_extensions import Self, assert_never +from cognite.client import CogniteClient from cognite.extractorutils._inner_util import _resolve_log_level from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.unstable.configuration.models import ( @@ -157,6 +158,7 @@ class Extractor(Generic[ConfigType], CogniteLogger): RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES SUPPORTS_DRY_RUN: bool = False + cognite_client: _NoOpCogniteClient | CogniteClient def __init__(self, config: FullConfig[ConfigType]) -> None: self.is_dry_run = config.is_dry_run diff --git a/tests/test_unstable/test_runtime.py b/tests/test_unstable/test_runtime.py index acf28230..8c92f5c5 100644 --- a/tests/test_unstable/test_runtime.py +++ b/tests/test_unstable/test_runtime.py @@ -9,7 +9,7 @@ import pytest from cognite.extractorutils.unstable.configuration.models import ConnectionConfig -from cognite.extractorutils.unstable.core.base import ConfigRevision +from cognite.extractorutils.unstable.core.base import ConfigRevision, FullConfig from cognite.extractorutils.unstable.core.runtime import Runtime from test_unstable.conftest import TestConfig, TestExtractor @@ -97,7 +97,7 @@ def cancel_after_delay() -> None: start_time = time.time() result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config( - args=Namespace(force_local_config=None), + args=Namespace(force_local_config=None, dry_run=False), connection_config=connection_config, ) duration = time.time() - start_time @@ -129,3 +129,44 @@ def test_changing_cwd() -> None: assert os.getcwd() == str(Path(__file__).parent) assert os.getcwd() != original_cwd + + +def test_unsupported_dry_run_crashes(connection_config: ConnectionConfig) -> None: + """ + Tests that an extractor with SUPPORTS_DRY_RUN = False raises a + NotImplementedError if started in dry-run mode. + """ + extractor_class = TestExtractor + + full_config = FullConfig( + connection_config=connection_config, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + is_dry_run=True, + ) + + with pytest.raises(NotImplementedError, match="does not support dry-run mode"): + extractor_class(full_config) + + +def test_supported_dry_run_uses_noop_client(connection_config: ConnectionConfig) -> None: + """ + Tests that an extractor with SUPPORTS_DRY_RUN = True uses the + _NoOpCogniteClient when in dry-run mode. + """ + + class DryRunSupportedExtractor(TestExtractor): + SUPPORTS_DRY_RUN = True + + full_config = FullConfig( + connection_config=connection_config, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + is_dry_run=True, + ) + + extractor = DryRunSupportedExtractor(full_config) + + from cognite.extractorutils.unstable.core.base import _NoOpCogniteClient + + assert isinstance(extractor.cognite_client, _NoOpCogniteClient) From 385fdb8553f5790be12fae37a2ab0d4cc20940fa Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Mon, 14 Jul 2025 12:47:30 +0530 Subject: [PATCH 3/7] make connection-config optional in dry-run mode --- cognite/extractorutils/unstable/core/base.py | 17 ++++-- .../extractorutils/unstable/core/runtime.py | 55 +++++++++++++------ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 184f07f5..b7f40da4 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -97,12 +97,13 @@ def json(self) -> dict: return {"lastConfigRevision": None} return {} - def __init__(self, config: ConnectionConfig, client_name: str) -> None: + def __init__(self, config: ConnectionConfig | None, client_name: str) -> None: class MockSDKConfig: def __init__(self, project: str) -> None: self.project = project - self.config = MockSDKConfig(config.project) + project_name = config.project if config else "dry-run-no-config" + self.config = MockSDKConfig(project_name) self._logger = logging.getLogger(__name__) self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}") @@ -125,9 +126,9 @@ class FullConfig(Generic[_T]): def __init__( self, - connection_config: ConnectionConfig, application_config: _T, current_config_revision: ConfigRevision, + connection_config: ConnectionConfig | None = None, log_level_override: str | None = None, is_dry_run: bool = False, ) -> None: @@ -177,8 +178,10 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: if self.is_dry_run: self.cognite_client = _NoOpCogniteClient(self.connection_config, f"{self.EXTERNAL_ID}-{self.VERSION}") - else: + elif self.connection_config: self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") + else: + raise ValueError("Connection config is missing and not in dry-run mode.") self._checkin_lock = RLock() self._runtime_messages: Queue[RuntimeMessage] | None = None @@ -261,6 +264,9 @@ def _set_runtime_message_queue(self, queue: Queue) -> None: self._runtime_messages = queue def _checkin(self) -> None: + if not self.connection_config: + return + with self._checkin_lock: task_updates = [t.model_dump() for t in self._task_updates] self._task_updates.clear() @@ -400,6 +406,9 @@ def run_task(task_context: TaskContext) -> None: ) def _report_extractor_info(self) -> None: + if not self.connection_config: + return + self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo", json={ diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 4de862b6..68d8bb81 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -101,7 +101,7 @@ def _create_argparser(self) -> ArgumentParser: "--connection-config", nargs=1, type=Path, - required=True, + required=False, help="Connection parameters", ) argparser.add_argument( @@ -196,7 +196,7 @@ def _spawn_extractor( def _try_get_application_config( self, args: Namespace, - connection_config: ConnectionConfig, + connection_config: ConnectionConfig | None, ) -> tuple[ExtractorConfig, ConfigRevision]: current_config_revision: ConfigRevision @@ -216,11 +216,12 @@ def _try_get_application_config( else: self.logger.info("Loading application config from CDF") - application_config, current_config_revision = load_from_cdf( - self._cognite_client, - connection_config.integration.external_id, - self._extractor_class.CONFIG_TYPE, - ) + if connection_config: + application_config, current_config_revision = load_from_cdf( + self._cognite_client, + connection_config.integration.external_id, + self._extractor_class.CONFIG_TYPE, + ) return application_config, current_config_revision @@ -238,7 +239,7 @@ def _try_change_cwd(self, cwd: Path | None) -> None: def _safe_get_application_config( self, args: Namespace, - connection_config: ConnectionConfig, + connection_config: ConnectionConfig | None, ) -> tuple[ExtractorConfig, ConfigRevision] | None: if args.dry_run and not args.force_local_config: self.logger.warning( @@ -272,23 +273,28 @@ def _safe_get_application_config( task=None, ) - self._cognite_client.post( - f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", - json={ - "externalId": connection_config.integration.external_id, - "errors": [error.model_dump()], - }, - headers={"cdf-version": "alpha"}, - ) + if connection_config: + self._cognite_client.post( + f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", + json={ + "externalId": connection_config.integration.external_id, + "errors": [error.model_dump()], + }, + headers={"cdf-version": "alpha"}, + ) self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL)) return None - def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool: + def _verify_connection_config(self, connection_config: ConnectionConfig | None) -> bool: + if connection_config is None: + return False + self._cognite_client = connection_config.get_cognite_client( f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}" ) + try: self._cognite_client.post( f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", @@ -348,7 +354,20 @@ def run(self) -> None: try: self._try_change_cwd(args.cwd[0]) - connection_config = load_file(args.connection_config[0], ConnectionConfig) + + if args.dry_run: + self.logger.info("Running in dry-run mode. No data will be written to CDF.") + + connection_config = ( + load_file(args.connection_config[0], ConnectionConfig) if args.connection_config else None + ) + else: + if not args.connection_config: + self.logger.critical("Connection config file is required when not in dry-run mode.") + sys.exit(1) + + connection_config = load_file(args.connection_config[0], ConnectionConfig) + except InvalidConfigError as e: self.logger.error(str(e)) self.logger.critical("Could not load connection config") From 196ad844f7e8c7d1913902f941e6f9dd28210616 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Mon, 14 Jul 2025 12:56:03 +0530 Subject: [PATCH 4/7] fix connection config typing --- cognite/extractorutils/unstable/core/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index b7f40da4..2818dcdb 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -126,9 +126,9 @@ class FullConfig(Generic[_T]): def __init__( self, + connection_config: ConnectionConfig | None, application_config: _T, current_config_revision: ConfigRevision, - connection_config: ConnectionConfig | None = None, log_level_override: str | None = None, is_dry_run: bool = False, ) -> None: From 4edb09cc06c61d15f051ef2eb273edfad68d5d53 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Wed, 16 Jul 2025 10:51:16 +0530 Subject: [PATCH 5/7] minor fix --- cognite/extractorutils/unstable/core/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 2818dcdb..5902b4c4 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -93,9 +93,7 @@ def __init__(self, url: str) -> None: self._url = url def json(self) -> dict: - if "integrations/checkin" in self._url: - return {"lastConfigRevision": None} - return {} + return {"lastConfigRevision": None} if "integrations/checkin" in self._url else {} def __init__(self, config: ConnectionConfig | None, client_name: str) -> None: class MockSDKConfig: From d7dccbb71fe6d221d3b107a23177598d49dc3bfa Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Wed, 16 Jul 2025 15:31:50 +0530 Subject: [PATCH 6/7] fix unbounded error block --- cognite/extractorutils/unstable/core/runtime.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 68d8bb81..c66b6d2a 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -213,15 +213,18 @@ def _try_get_application_config( self.logger.critical(str(e)) raise InvalidConfigError(str(e)) from e - else: + elif connection_config: self.logger.info("Loading application config from CDF") - if connection_config: - application_config, current_config_revision = load_from_cdf( - self._cognite_client, - connection_config.integration.external_id, - self._extractor_class.CONFIG_TYPE, - ) + application_config, current_config_revision = load_from_cdf( + self._cognite_client, + connection_config.integration.external_id, + self._extractor_class.CONFIG_TYPE, + ) + + else: + self.logger.critical("No connection config provided and no local config file specified.") + raise InvalidConfigError("No connection config provided and no local config file specified.") return application_config, current_config_revision From 1471200b0cffa9e7a2337998a1a2dd52d1aafc32 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 17 Jul 2025 11:01:15 +0530 Subject: [PATCH 7/7] [modify] change mock response and log preperly. --- cognite/extractorutils/unstable/core/base.py | 44 +++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 5902b4c4..0377a89f 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -89,29 +89,49 @@ class _NoOpCogniteClient: """A mock CogniteClient that performs no actions, for use in dry-run mode.""" class _MockResponse: - def __init__(self, url: str) -> None: + def __init__(self, url: str, current_config_revision: int | str, external_id: str) -> None: self._url = url + self.current_config_revision = current_config_revision + self.external_id = external_id def json(self) -> dict: - return {"lastConfigRevision": None} if "integrations/checkin" in self._url else {} + if "integrations/checkin" in self._url or "/integrations/extractorinfo" in self._url: + if self.current_config_revision == "local": + return {"externalId": self.external_id} + elif isinstance(self.current_config_revision, int): + return {"externalId": self.external_id, "lastConfigRevision": self.current_config_revision} + return {} - def __init__(self, config: ConnectionConfig | None, client_name: str) -> None: + def __init__( + self, + config: ConnectionConfig | None, + current_config_revision: int | str, + client_name: str, + logger: logging.Logger, + ) -> None: class MockSDKConfig: def __init__(self, project: str) -> None: self.project = project project_name = config.project if config else "dry-run-no-config" + self.external_id = config.integration.external_id if config and config.integration else "dry-run-no-integration" self.config = MockSDKConfig(project_name) - self._logger = logging.getLogger(__name__) + self.current_config_revision = current_config_revision + # self._logger = logging.getLogger(__name__) + self._logger = logger self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}") def post(self, url: str, json: dict, **kwargs: dict[str, Any]) -> _MockResponse: - self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}") - return self._MockResponse(url) + response = self._MockResponse(url, self.current_config_revision, self.external_id) + self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}.") + self._logger.info(f"[DRY-RUN] Response: {response.json()}") + return response def get(self, url: str, **kwargs: dict[str, Any]) -> _MockResponse: - self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}") - return self._MockResponse(url) + response = self._MockResponse(url, self.current_config_revision, self.external_id) + self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}.") + self._logger.info(f"[DRY-RUN] Response: {response.json()}") + return response class FullConfig(Generic[_T]): @@ -124,15 +144,15 @@ class FullConfig(Generic[_T]): def __init__( self, - connection_config: ConnectionConfig | None, application_config: _T, current_config_revision: ConfigRevision, + connection_config: ConnectionConfig | None, log_level_override: str | None = None, is_dry_run: bool = False, ) -> None: - self.connection_config = connection_config self.application_config = application_config self.current_config_revision = current_config_revision + self.connection_config = connection_config self.log_level_override = log_level_override self.is_dry_run = is_dry_run @@ -175,7 +195,9 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self.log_level_override = config.log_level_override if self.is_dry_run: - self.cognite_client = _NoOpCogniteClient(self.connection_config, f"{self.EXTERNAL_ID}-{self.VERSION}") + self.cognite_client = _NoOpCogniteClient( + self.connection_config, self.current_config_revision, f"{self.EXTERNAL_ID}-{self.VERSION}", self._logger + ) elif self.connection_config: self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") else: