Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: 3.11
- uses: actions/cache@v4
with:
key: ${{ github.ref }}
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ See the [docs](https://cesnet.github.io/dp3/howto/get-started/) for more details

### Installing for application development

Pre-requisites: Python 3.9 or higher, `pip` (with `virtualenv` installed), `git`, `Docker` and `Docker Compose`.
Pre-requisites: Python 3.11 or higher, `pip` (with `virtualenv` installed), `git`, `Docker` and `Docker Compose`.

Create a virtualenv and install the DP³ platform using:

Expand Down Expand Up @@ -117,7 +117,7 @@ You are now ready to start developing your application!

## Installing for platform development

Pre-requisites: Python 3.9 or higher, `pip` (with `virtualenv` installed), `git`, `Docker` and `Docker Compose`.
Pre-requisites: Python 3.11 or higher, `pip` (with `virtualenv` installed), `git`, `Docker` and `Docker Compose`.

Pull the repository and install using:

Expand Down
2 changes: 1 addition & 1 deletion docker/python/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# syntax=docker/dockerfile:1

# Base interpreter with installed requirements
FROM python:3.9-slim AS base
FROM python:3.11-slim AS base
RUN apt-get update; apt-get install -y \
gcc \
git
Expand Down
2 changes: 2 additions & 0 deletions docs/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ Code reference: [`scheduler_register`][dp3.common.callback_registrar.CallbackReg

Most user-facing hooks return `list[DataPointTask]`.
Whenever that happens, the returned tasks are fed back into the main ingestion system.
Each returned task must do useful work: it must contain at least one datapoint, carry non-empty TTL tokens, or be a delete task.
Empty `DataPointTask` objects are rejected during validation because they would be queued and processed without changing DP3 state.

This creates a feedback loop:

Expand Down
2 changes: 1 addition & 1 deletion docs/howto/develop-dp3.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ You will end up with:

For platform development, you need:

- Python 3.9 or higher
- Python 3.11 or higher
- `pip`
- `git`
- Docker
Expand Down
2 changes: 1 addition & 1 deletion docs/howto/get-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ You will end up with:

For local application development, you need:

- Python 3.9 or higher
- Python 3.11 or higher
- `pip`
- `git`
- Docker
Expand Down
153 changes: 153 additions & 0 deletions docs/howto/test-module.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Test a secondary module

DP3 includes helpers for writing focused unit tests for secondary modules without running a full
worker, database, message broker, or snapshot scheduler.

Use [`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase] when you want to instantiate a
module with the application's real `db_entities` model and then call registered hooks directly.
The test registrar captures callbacks during module initialization and exposes runners for the
common hook families.

The config directory is read from the `DP3_CONFIG_DIR` environment variable unless a test class
sets `config_dir` explicitly. Module configuration is read from `modules.<module_name>` in that
config by default, where `<module_name>` is inferred from the module class' Python module name.

```bash
DP3_CONFIG_DIR=config python -m unittest discover -s tests -v
```

## Basic pattern

```python
from unittest.mock import patch

from dp3.testing import DP3ModuleTestCase
from modules.ip_exposure_profile import IPExposureProfile


class TestIPExposureProfile(DP3ModuleTestCase):
module_class = IPExposureProfile

def test_open_port_creates_service_and_link(self):
dp = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)

tasks = self.run_on_new_attr("ip", "open_ports", "192.0.2.1", dp)

self.assertDatapoint(tasks, etype="service", eid="192.0.2.1:443", attr="guessed_type")
self.assertDatapoint(tasks, etype="ip", eid="192.0.2.1", attr="services")

def test_updater_uses_mocked_external_lookup(self):
with patch.object(self.module, "_fetch_service_intel", return_value={"risk": "high"}):
tasks = self.run_periodic_update(
"service",
"192.0.2.1:443",
{"guessed_type": "https"},
hook_id="service_intel",
)

self.assertDatapoint(tasks, attr="external_risk", v="high")
```

## What the helper provides

`DP3ModuleTestCase`:

- loads `db_entities` from `DP3_CONFIG_DIR` or `config_dir` and builds a real `ModelSpec`,
- creates a minimal `PlatformConfig`,
- instantiates `module_class` with a test registrar,
- creates validated `DataPointTask` and plain, observation, or timeseries datapoint objects using
the loaded model,
- calls registered hooks directly,
- provides partial-match assertions for emitted tasks, datapoints, and mutated records.

The helper is intended for module-level unit tests. It does not run a database, task queues,
worker processes, recursive task ingestion, or full linked snapshot loading.

## Datapoint helpers

Use the datapoint helpers to build values accepted by the loaded model specification:

```python
plain = self.make_plain_datapoint("ip", "192.0.2.1", "hostname", "host.example")
observation = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)
timeseries = self.make_timeseries_datapoint(
"ip",
"192.0.2.1",
"traffic",
{"packets": [1, 2, 3], "bytes": [100, 200, 300]},
)
```

For regular timeseries attributes, `make_timeseries_datapoint()` infers `t2` from `t1`, the
configured `time_step`, and the number of samples when `t2` is not supplied.

## Hook runners

Common runners are available on the test case:

- `run_allow_entity_creation(entity, eid, task=None)`
- `run_on_entity_creation(entity, eid, task=None)`
- `run_on_new_attr(entity, attr, eid, dp)`
- `run_correlation_hooks(entity_type, record, master_record=None)`
- `run_periodic_update(entity_type, eid, master_record, hook_id=None)`
- `run_periodic_eid_update(entity_type, eid, hook_id=None)`
- `run_scheduler_job(index_or_func)`

Correlation tests pass the snapshot `record` explicitly. The record must contain `eid`.
Scheduler jobs can be selected by registration index, callable, or callable name.

## Assertions

Assertions use partial matching: only fields supplied in the expected values are checked.

```python
self.assertDatapoint(tasks, etype="ip", attr="hostname", v="example.test")
self.assertTaskEmitted(tasks, etype="ip", eid="192.0.2.1")
self.assertNoTasks(tasks)
self.assertNoDatapoints(tasks)
self.assertRecordContains(record, exposure_score=10)
self.assertRecordAttr(record, "exposure_score", 10)
self.assertRecordUnchanged(before, after)
```

Snake-case aliases are also available: `assert_datapoint`, `assert_task_emitted`,
`assert_no_tasks`, `assert_no_datapoints`, `assert_record_contains`, `assert_record_attr`, and
`assert_record_unchanged`.

## Registration assertions

Use registration assertions when a test needs to verify callback coverage or dynamic hook
registration.

```python
self.assert_registered("on_new_attr", entity="ip", attr="hostname")
self.assert_registered_once("correlation", entity_type="service")
self.assert_registered_attrs("service", expected_service_attrs)
self.assert_scheduler_registered(func="reload_ip_groups", minute="*/10")
```

`assert_scheduler_registered()` accepts scheduler fields such as `minute`, `hour`, and `second`,
along with `func` for matching the registered callable by object or function name.

## Mocking external dependencies

Patch external constructors or functions before module instantiation when the dependency is created
in `__init__` or `load_config`:

```python
class TestDNSModule(DP3ModuleTestCase):
module_class = DNSModule

def setUp(self):
self.resolver_patcher = patch("modules.dns_module.Resolver", FakeResolver)
self.resolver_patcher.start()
self.addCleanup(self.resolver_patcher.stop)
super().setUp()
```

If patching is not convenient, use a test subclass as `module_class` and override the module's
initialization or dependency construction while keeping the hook methods under test unchanged.

Deprecated registrar methods (`register_entity_hook` and `register_attr_hook`) are supported by the
test registrar and emit `DeprecationWarning`. Prefer the modern registration methods in new module
code and tests.
10 changes: 10 additions & 0 deletions docs/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ and configuration, see the [updater configuration](configuration/updater.md) pag
- [`scheduler_register(...)`](hooks.md#scheduler_register) — CRON-style module-level
scheduled callback for maintenance, polling, housekeeping, or shared-state reloads.

## Testing modules

Secondary modules can be unit-tested without running a full DP3 worker by using
[`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase]. The helper loads an application's
real `db_entities` model from `DP3_CONFIG_DIR` or an explicit test fixture path, instantiates a
module with a test callback registrar, and lets tests call registered hooks directly with validated
`DataPointTask` and datapoint objects.

See [Test a secondary module](howto/test-module.md) for examples and supported hook runners.

## Running module code in a separate thread

The module is free to run its own code in separate threads or processes.
Expand Down
2 changes: 1 addition & 1 deletion dp3/api/internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def validate(cls, v):

try:
# Validate and parse environmental variables
conf_env = ConfigEnv.parse_obj(os.environ)
conf_env = ConfigEnv.model_validate(os.environ)
except ValidationError as e:
config_error = any("CONF_DIR" in x["loc"] and len(x["loc"]) > 1 for x in e.errors())
env_error = any(len(x["loc"]) == 1 for x in e.errors())
Expand Down
8 changes: 4 additions & 4 deletions dp3/api/internal/entity_response_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Annotated, Any, Optional, Union
from typing import Annotated, Any

from pydantic import BaseModel, Field, NonNegativeInt, PlainSerializer

Expand All @@ -25,11 +25,11 @@ class EntityState(BaseModel):
JsonVal = Annotated[Any, PlainSerializer(to_json_friendly, when_used="json")]

LinkVal = dict[str, JsonVal]
PlainVal = Union[LinkVal, JsonVal]
PlainVal = LinkVal | JsonVal
MultiVal = list[PlainVal]
HistoryVal = list[dict[str, PlainVal]]

Dp3Val = Union[HistoryVal, MultiVal, PlainVal]
Dp3Val = HistoryVal | MultiVal | PlainVal

EntityEidMasterRecord = dict[str, Dp3Val]

Expand All @@ -45,7 +45,7 @@ class EntityEidList(BaseModel):
Data does not include history of observations attributes and timeseries.
"""

time_created: Optional[datetime] = None
time_created: datetime | None = None
count: int
data: EntityEidSnapshots

Expand Down
20 changes: 11 additions & 9 deletions dp3/api/internal/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Annotated, Any, Literal, Optional, Union
from functools import reduce
from operator import or_
from typing import Annotated, Any, Literal

from pydantic import BaseModel, Field, TypeAdapter, create_model, model_validator

Expand Down Expand Up @@ -26,10 +28,10 @@ class DataPoint(BaseModel):
id: Any
attr: str
v: Any
t1: Optional[AwareDatetime] = None
t2: Optional[T2Datetime] = Field(None, validate_default=True)
t1: AwareDatetime | None = None
t2: T2Datetime | None = Field(None, validate_default=True)
c: Annotated[float, Field(ge=0.0, le=1.0)] = 1.0
src: Optional[str] = None
src: str | None = None

@model_validator(mode="after")
def validate_against_attribute(self):
Expand All @@ -43,14 +45,14 @@ def validate_against_attribute(self):


class EntityId(BaseModel):
"""Dummy model for entity id
"""Common interface for validated entity identifiers.

Attributes:
type: Entity type
id: Entity ID
"""

type: Literal["entity_type"]
type: str
id: Any


Expand All @@ -60,11 +62,11 @@ class EntityId(BaseModel):
entity_id_models.append(
create_model(
f"EntityId{{{entity_type}}}",
__base__=BaseModel,
__base__=EntityId,
type=(Literal[entity_type], Field(..., alias="etype")),
id=(dtype, Field(..., alias="eid")),
)
)

EntityId = Annotated[Union[tuple(entity_id_models)], Field(discriminator="type")] # noqa: F811
EntityIdAdapter = TypeAdapter(EntityId)
EntityIdType = Annotated[reduce(or_, entity_id_models), Field(discriminator="type")]
EntityIdAdapter = TypeAdapter(EntityIdType)
Loading
Loading