Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ Code reference: [`scheduler_register`][dp3.common.callback_registrar.CallbackReg

Most user-facing hooks return `list[DataPointTask]`.
Whenever that happens, the returned tasks are fed back into the main ingestion system.
Each returned task must do useful work: it must contain at least one datapoint, carry non-empty TTL tokens, or be a delete task.
Empty `DataPointTask` objects are rejected during validation because they would be queued and processed without changing DP3 state.

This creates a feedback loop:

Expand Down
153 changes: 153 additions & 0 deletions docs/howto/test-module.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Test a secondary module

DP3 includes helpers for writing focused unit tests for secondary modules without running a full
worker, database, message broker, or snapshot scheduler.

Use [`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase] when you want to instantiate a
module with the application's real `db_entities` model and then call registered hooks directly.
The test registrar captures callbacks during module initialization and exposes runners for the
common hook families.

The config directory is read from the `DP3_CONFIG_DIR` environment variable unless a test class
sets `config_dir` explicitly. Module configuration is read from `modules.<module_name>` in that
config by default, where `<module_name>` is inferred from the module class' Python module name.

```bash
DP3_CONFIG_DIR=config python -m unittest discover -s tests -v
```

## Basic pattern

```python
from unittest.mock import patch

from dp3.testing import DP3ModuleTestCase
from modules.ip_exposure_profile import IPExposureProfile


class TestIPExposureProfile(DP3ModuleTestCase):
module_class = IPExposureProfile

def test_open_port_creates_service_and_link(self):
dp = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)

tasks = self.run_on_new_attr("ip", "open_ports", "192.0.2.1", dp)

self.assertDatapoint(tasks, etype="service", eid="192.0.2.1:443", attr="guessed_type")
self.assertDatapoint(tasks, etype="ip", eid="192.0.2.1", attr="services")

def test_updater_uses_mocked_external_lookup(self):
with patch.object(self.module, "_fetch_service_intel", return_value={"risk": "high"}):
tasks = self.run_periodic_update(
"service",
"192.0.2.1:443",
{"guessed_type": "https"},
hook_id="service_intel",
)

self.assertDatapoint(tasks, attr="external_risk", v="high")
```

## What the helper provides

`DP3ModuleTestCase`:

- loads `db_entities` from `DP3_CONFIG_DIR` or `config_dir` and builds a real `ModelSpec`,
- creates a minimal `PlatformConfig`,
- instantiates `module_class` with a test registrar,
- creates validated `DataPointTask` and plain, observation, or timeseries datapoint objects using
the loaded model,
- calls registered hooks directly,
- provides partial-match assertions for emitted tasks, datapoints, and mutated records.

The helper is intended for module-level unit tests. It does not run a database, task queues,
worker processes, recursive task ingestion, or full linked snapshot loading.

## Datapoint helpers

Use the datapoint helpers to build values accepted by the loaded model specification:

```python
plain = self.make_plain_datapoint("ip", "192.0.2.1", "hostname", "host.example")
observation = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)
timeseries = self.make_timeseries_datapoint(
"ip",
"192.0.2.1",
"traffic",
{"packets": [1, 2, 3], "bytes": [100, 200, 300]},
)
```

For regular timeseries attributes, `make_timeseries_datapoint()` infers `t2` from `t1`, the
configured `time_step`, and the number of samples when `t2` is not supplied.

## Hook runners

Common runners are available on the test case:

- `run_allow_entity_creation(entity, eid, task=None)`
- `run_on_entity_creation(entity, eid, task=None)`
- `run_on_new_attr(entity, attr, eid, dp)`
- `run_correlation_hooks(entity_type, record, master_record=None)`
- `run_periodic_update(entity_type, eid, master_record, hook_id=None)`
- `run_periodic_eid_update(entity_type, eid, hook_id=None)`
- `run_scheduler_job(index_or_func)`

Correlation tests pass the snapshot `record` explicitly. The record must contain `eid`.
Scheduler jobs can be selected by registration index, callable, or callable name.

## Assertions

Assertions use partial matching: only fields supplied in the expected values are checked.

```python
self.assertDatapoint(tasks, etype="ip", attr="hostname", v="example.test")
self.assertTaskEmitted(tasks, etype="ip", eid="192.0.2.1")
self.assertNoTasks(tasks)
self.assertNoDatapoints(tasks)
self.assertRecordContains(record, exposure_score=10)
self.assertRecordAttr(record, "exposure_score", 10)
self.assertRecordUnchanged(before, after)
```

Snake-case aliases are also available: `assert_datapoint`, `assert_task_emitted`,
`assert_no_tasks`, `assert_no_datapoints`, `assert_record_contains`, `assert_record_attr`, and
`assert_record_unchanged`.

## Registration assertions

Use registration assertions when a test needs to verify callback coverage or dynamic hook
registration.

```python
self.assert_registered("on_new_attr", entity="ip", attr="hostname")
self.assert_registered_once("correlation", entity_type="service")
self.assert_registered_attrs("service", expected_service_attrs)
self.assert_scheduler_registered(func="reload_ip_groups", minute="*/10")
```

`assert_scheduler_registered()` accepts scheduler fields such as `minute`, `hour`, and `second`,
along with `func` for matching the registered callable by object or function name.

## Mocking external dependencies

Patch external constructors or functions before module instantiation when the dependency is created
in `__init__` or `load_config`:

```python
class TestDNSModule(DP3ModuleTestCase):
module_class = DNSModule

def setUp(self):
self.resolver_patcher = patch("modules.dns_module.Resolver", FakeResolver)
self.resolver_patcher.start()
self.addCleanup(self.resolver_patcher.stop)
super().setUp()
```

If patching is not convenient, use a test subclass as `module_class` and override the module's
initialization or dependency construction while keeping the hook methods under test unchanged.

Deprecated registrar methods (`register_entity_hook` and `register_attr_hook`) are supported by the
test registrar and emit `DeprecationWarning`. Prefer the modern registration methods in new module
code and tests.
10 changes: 10 additions & 0 deletions docs/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ and configuration, see the [updater configuration](configuration/updater.md) pag
- [`scheduler_register(...)`](hooks.md#scheduler_register) — CRON-style module-level
scheduled callback for maintenance, polling, housekeeping, or shared-state reloads.

## Testing modules

Secondary modules can be unit-tested without running a full DP3 worker by using
[`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase]. The helper loads an application's
real `db_entities` model from `DP3_CONFIG_DIR` or an explicit test fixture path, instantiates a
module with a test callback registrar, and lets tests call registered hooks directly with validated
`DataPointTask` and datapoint objects.

See [Test a secondary module](howto/test-module.md) for examples and supported hook runners.

## Running module code in a separate thread

The module is free to run its own code in separate threads or processes.
Expand Down
8 changes: 5 additions & 3 deletions dp3/common/callback_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dp3.common.datatype import AnyEidT
from dp3.common.scheduler import Scheduler
from dp3.common.state import SharedFlag
from dp3.common.task import DataPointTask
from dp3.common.task import DataPointTask, task_context
from dp3.common.types import ParsedTimedelta
from dp3.core.updater import Updater
from dp3.snapshots.snapshooter import SnapShooter
Expand Down Expand Up @@ -57,7 +57,8 @@ def on_entity_creation_in_snapshots(
if not run_flag.isset():
return []
eid = record["eid"]
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
with task_context(model_spec, allow_empty_data_point_task=True):
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
tasks = original_hook(eid, mock_task)
write_datapoints_into_record(model_spec, tasks, record)
return tasks
Expand All @@ -74,7 +75,8 @@ def on_attr_change_in_snapshots(
if not run_flag.isset():
return []
eid = record["eid"]
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
with task_context(model_spec, allow_empty_data_point_task=True):
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
tasks = original_hook(eid, mock_task)
if isinstance(tasks, list):
write_datapoints_into_record(model_spec, tasks, record)
Expand Down
25 changes: 23 additions & 2 deletions dp3/common/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
Tag,
TypeAdapter,
ValidationError,
ValidationInfo,
field_validator,
model_validator,
)
from pydantic_core.core_schema import FieldValidationInfo

Expand All @@ -40,9 +42,16 @@ def HASH(key: str) -> int:


@contextmanager
def task_context(model_spec: ModelSpec) -> Iterator[None]:
def task_context(
model_spec: ModelSpec, *, allow_empty_data_point_task: bool = False
) -> Iterator[None]:
"""Context manager for setting the `model_spec` context variable."""
token = _init_context_var.set({"model_spec": model_spec})
token = _init_context_var.set(
{
"model_spec": model_spec,
"allow_empty_data_point_task": allow_empty_data_point_task,
}
)
try:
yield
finally:
Expand Down Expand Up @@ -182,6 +191,18 @@ def validate_eid(cls, v, info: FieldValidationInfo):
else:
raise AssertionError("Missing `model_spec` in context")

@model_validator(mode="after")
def validate_not_empty(self, info: ValidationInfo):
context = info.context
if context and context.get("allow_empty_data_point_task"):
return self
if not self.data_points and not self.ttl_tokens and not self.delete:
raise ValueError(
"DataPointTask must contain at least one datapoint, non-empty ttl_tokens, "
"or be a delete task."
)
return self


def parse_data_point_task(task: str, model_spec: ModelSpec) -> DataPointTask:
with task_context(model_spec):
Expand Down
6 changes: 3 additions & 3 deletions dp3/task_processing/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ def refresh_on_entity_creation(
for master_record in self.db.get_worker_master_records(
worker_id, worker_cnt, etype, projection=projection
):
with task_context(self.model_spec):
with task_context(self.model_spec, allow_empty_data_point_task=True):
task = DataPointTask(etype=etype, eid=master_record["_id"])
self.log.debug(f"Refreshing {etype}/{task.eid}")
new_tasks += self._task_entity_hooks[task.etype].run_on_creation(task.eid, task)
self.log.debug(f"Refreshing {etype}/{task.eid}")
new_tasks += self._task_entity_hooks[task.etype].run_on_creation(task.eid, task)

return new_tasks

Expand Down
13 changes: 13 additions & 0 deletions dp3/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Testing helpers for DP3 applications."""

from dp3.testing.case import DP3ModuleTestCase
from dp3.testing.config import CONFIG_DIR_ENV, resolve_config_dir
from dp3.testing.registrar import HookRegistration, TestCallbackRegistrar

__all__ = [
"CONFIG_DIR_ENV",
"DP3ModuleTestCase",
"HookRegistration",
"TestCallbackRegistrar",
"resolve_config_dir",
]
Loading
Loading