diff --git a/python/lib/sift_client/_internal/grpc/__init__.py b/python/lib/sift_client/_internal/grpc_transport/__init__.py similarity index 81% rename from python/lib/sift_client/_internal/grpc/__init__.py rename to python/lib/sift_client/_internal/grpc_transport/__init__.py index 738259dc8..0ddd66ecd 100644 --- a/python/lib/sift_client/_internal/grpc/__init__.py +++ b/python/lib/sift_client/_internal/grpc_transport/__init__.py @@ -4,7 +4,7 @@ Example of establishing a connection to Sift's gRPC APi: ```python -from sift_client._internal.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_client._internal.grpc_transport.transport import SiftChannelConfig, use_sift_channel # Be sure not to include the url scheme i.e. 'https://' in the uri. sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY) diff --git a/python/lib/sift_client/_internal/grpc/_async_interceptors/__init__.py b/python/lib/sift_client/_internal/grpc_transport/_async_interceptors/__init__.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_async_interceptors/__init__.py rename to python/lib/sift_client/_internal/grpc_transport/_async_interceptors/__init__.py diff --git a/python/lib/sift_client/_internal/grpc/_async_interceptors/base.py b/python/lib/sift_client/_internal/grpc_transport/_async_interceptors/base.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_async_interceptors/base.py rename to python/lib/sift_client/_internal/grpc_transport/_async_interceptors/base.py diff --git a/python/lib/sift_client/_internal/grpc/_async_interceptors/metadata.py b/python/lib/sift_client/_internal/grpc_transport/_async_interceptors/metadata.py similarity index 90% rename from python/lib/sift_client/_internal/grpc/_async_interceptors/metadata.py rename to python/lib/sift_client/_internal/grpc_transport/_async_interceptors/metadata.py index 95cc5a925..fce22da80 100644 --- a/python/lib/sift_client/_internal/grpc/_async_interceptors/metadata.py +++ b/python/lib/sift_client/_internal/grpc_transport/_async_interceptors/metadata.py @@ -4,7 +4,7 @@ from grpc import aio as grpc_aio -from sift_client._internal.grpc._async_interceptors.base import ClientAsyncInterceptor +from sift_client._internal.grpc_transport._async_interceptors.base import ClientAsyncInterceptor Metadata = List[Tuple[str, str]] diff --git a/python/lib/sift_client/_internal/grpc/_interceptors/__init__.py b/python/lib/sift_client/_internal/grpc_transport/_interceptors/__init__.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_interceptors/__init__.py rename to python/lib/sift_client/_internal/grpc_transport/_interceptors/__init__.py diff --git a/python/lib/sift_client/_internal/grpc/_interceptors/base.py b/python/lib/sift_client/_internal/grpc_transport/_interceptors/base.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_interceptors/base.py rename to python/lib/sift_client/_internal/grpc_transport/_interceptors/base.py diff --git a/python/lib/sift_client/_internal/grpc/_interceptors/context.py b/python/lib/sift_client/_internal/grpc_transport/_interceptors/context.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_interceptors/context.py rename to python/lib/sift_client/_internal/grpc_transport/_interceptors/context.py diff --git a/python/lib/sift_client/_internal/grpc/_interceptors/metadata.py b/python/lib/sift_client/_internal/grpc_transport/_interceptors/metadata.py similarity index 80% rename from python/lib/sift_client/_internal/grpc/_interceptors/metadata.py rename to python/lib/sift_client/_internal/grpc_transport/_interceptors/metadata.py index afb5da50c..054dfa652 100644 --- a/python/lib/sift_client/_internal/grpc/_interceptors/metadata.py +++ b/python/lib/sift_client/_internal/grpc_transport/_interceptors/metadata.py @@ -2,8 +2,8 @@ import grpc -from sift_client._internal.grpc._interceptors.base import ClientInterceptor, Continuation -from sift_client._internal.grpc._interceptors.context import ClientCallDetails +from sift_client._internal.grpc_transport._interceptors.base import ClientInterceptor, Continuation +from sift_client._internal.grpc_transport._interceptors.context import ClientCallDetails Metadata = List[Tuple[str, str]] diff --git a/python/lib/sift_client/_internal/grpc/_retry.py b/python/lib/sift_client/_internal/grpc_transport/_retry.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/_retry.py rename to python/lib/sift_client/_internal/grpc_transport/_retry.py diff --git a/python/lib/sift_client/_internal/grpc/keepalive.py b/python/lib/sift_client/_internal/grpc_transport/keepalive.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/keepalive.py rename to python/lib/sift_client/_internal/grpc_transport/keepalive.py diff --git a/python/lib/sift_client/_internal/grpc/server_interceptors/__init__.py b/python/lib/sift_client/_internal/grpc_transport/server_interceptors/__init__.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/server_interceptors/__init__.py rename to python/lib/sift_client/_internal/grpc_transport/server_interceptors/__init__.py diff --git a/python/lib/sift_client/_internal/grpc/server_interceptors/server_interceptor.py b/python/lib/sift_client/_internal/grpc_transport/server_interceptors/server_interceptor.py similarity index 100% rename from python/lib/sift_client/_internal/grpc/server_interceptors/server_interceptor.py rename to python/lib/sift_client/_internal/grpc_transport/server_interceptors/server_interceptor.py diff --git a/python/lib/sift_client/_internal/grpc/transport.py b/python/lib/sift_client/_internal/grpc_transport/transport.py similarity index 93% rename from python/lib/sift_client/_internal/grpc/transport.py rename to python/lib/sift_client/_internal/grpc_transport/transport.py index 1043245a8..7e0bc5425 100644 --- a/python/lib/sift_client/_internal/grpc/transport.py +++ b/python/lib/sift_client/_internal/grpc_transport/transport.py @@ -14,14 +14,19 @@ import grpc.aio as grpc_aio from typing_extensions import NotRequired, TypeAlias -from sift_client._internal.grpc._async_interceptors.metadata import MetadataAsyncInterceptor -from sift_client._internal.grpc._interceptors.metadata import Metadata, MetadataInterceptor +from sift_client._internal.grpc_transport._async_interceptors.metadata import ( + MetadataAsyncInterceptor, +) +from sift_client._internal.grpc_transport._interceptors.metadata import ( + Metadata, + MetadataInterceptor, +) if TYPE_CHECKING: - from sift_client._internal.grpc._async_interceptors.base import ClientAsyncInterceptor - from sift_client._internal.grpc._interceptors.base import ClientInterceptor -from sift_client._internal.grpc._retry import RetryPolicy -from sift_client._internal.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig + from sift_client._internal.grpc_transport._async_interceptors.base import ClientAsyncInterceptor + from sift_client._internal.grpc_transport._interceptors.base import ClientInterceptor +from sift_client._internal.grpc_transport._retry import RetryPolicy +from sift_client._internal.grpc_transport.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig SiftChannel: TypeAlias = grpc.Channel SiftAsyncChannel: TypeAlias = grpc_aio.Channel diff --git a/python/lib/sift_client/_internal/grpc/transport_test.py b/python/lib/sift_client/_internal/grpc_transport/transport_test.py similarity index 97% rename from python/lib/sift_client/_internal/grpc/transport_test.py rename to python/lib/sift_client/_internal/grpc_transport/transport_test.py index efccb6b4e..55984f2d7 100644 --- a/python/lib/sift_client/_internal/grpc/transport_test.py +++ b/python/lib/sift_client/_internal/grpc_transport/transport_test.py @@ -15,8 +15,10 @@ add_DataServiceServicer_to_server, ) -from sift_client._internal.grpc.server_interceptors.server_interceptor import ServerInterceptor -from sift_client._internal.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_client._internal.grpc_transport.server_interceptors.server_interceptor import ( + ServerInterceptor, +) +from sift_client._internal.grpc_transport.transport import SiftChannelConfig, use_sift_channel class DataService(DataServiceServicer): diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index 94cbc8950..5646c5825 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -1,13 +1,21 @@ from __future__ import annotations +import fcntl +import json import logging +import re +import uuid +from dataclasses import dataclass, field +from pathlib import Path from typing import TYPE_CHECKING, Any, cast +from google.protobuf import json_format from sift.test_reports.v1.test_reports_pb2 import ( CreateTestMeasurementRequest, CreateTestMeasurementResponse, CreateTestMeasurementsRequest, CreateTestMeasurementsResponse, + CreateTestReportRequest, CreateTestReportResponse, CreateTestStepRequest, CreateTestStepResponse, @@ -31,6 +39,9 @@ UpdateTestStepRequest, UpdateTestStepResponse, ) +from sift.test_reports.v1.test_reports_pb2 import TestMeasurement as TestMeasurementProto +from sift.test_reports.v1.test_reports_pb2 import TestReport as TestReportProto +from sift.test_reports.v1.test_reports_pb2 import TestStep as TestStepProto from sift.test_reports.v1.test_reports_pb2_grpc import TestReportServiceStub from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase @@ -48,6 +59,8 @@ from sift_client.transport import WithGrpcClient if TYPE_CHECKING: + from collections.abc import Generator + from sift_client.transport.grpc_transport import GrpcClient # Configure logging @@ -91,17 +104,38 @@ async def import_test_report(self, remote_file_id: str) -> TestReport: async def create_test_report( self, *, - test_report: TestReportCreate, + test_report: TestReportCreate | None = None, + request: CreateTestReportRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, ) -> TestReport: """Create a new test report. Args: - test_report: The test report to create. + test_report: The test report to create. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``test_report``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. Returns: The created TestReport. """ - request = test_report.to_proto() + if request is None: + if test_report is None: + raise ValueError("Either test_report or request must be provided") + request = test_report.to_proto() + + if simulate or log_file is not None: + simulated_proto = self.simulate_create_test_report_response(request) + if log_file is not None: + self._log_request_to_file( + log_file, + "CreateTestReport", + request, + response_id=simulated_proto.test_report_id, + ) + return TestReport._from_proto(simulated_proto) + response = await self._grpc_client.get_stub(TestReportServiceStub).CreateTestReport(request) grpc_test_report = cast("CreateTestReportResponse", response).test_report return TestReport._from_proto(grpc_test_report) @@ -188,17 +222,39 @@ async def list_all_test_reports( page_size=page_size, ) - async def update_test_report(self, update: TestReportUpdate) -> TestReport: + async def update_test_report( + self, + update: TestReportUpdate | None = None, + *, + request: UpdateTestReportRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, + existing: TestReport | None = None, + ) -> TestReport: """Update an existing test report. Args: - update: The updates to apply. + update: The updates to apply. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``update``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. + existing: The full existing TestReport for simulation merge. If not provided, + the simulated response will only contain the updated fields. Returns: The updated TestReport. """ - test_report_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_report_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) + + if simulate or log_file is not None: + if log_file is not None: + self._log_request_to_file(log_file, "UpdateTestReport", request) + return self.simulate_update_test_report_response(request, existing=existing) + response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestReport(request) grpc_test_report = cast("UpdateTestReportResponse", response).test_report return TestReport._from_proto(grpc_test_report) @@ -220,16 +276,41 @@ async def delete_test_report(self, test_report_id: str) -> None: # Test Steps - async def create_test_step(self, test_step: TestStepCreate) -> TestStep: + async def create_test_step( + self, + test_step: TestStepCreate | None = None, + *, + request: CreateTestStepRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, + ) -> TestStep: """Create a new test step. Args: - test_step: The test step to create. + test_step: The test step to create. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``test_step``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. Returns: The created TestStep. """ - request = CreateTestStepRequest(test_step=test_step.to_proto()) + if request is None: + if test_step is None: + raise ValueError("Either test_step or request must be provided") + request = CreateTestStepRequest(test_step=test_step.to_proto()) + + if simulate or log_file is not None: + simulated_proto = self.simulate_create_test_step_response(request) + if log_file is not None: + self._log_request_to_file( + log_file, + "CreateTestStep", + request, + response_id=simulated_proto.test_step_id, + ) + return TestStep._from_proto(simulated_proto) + response = await self._grpc_client.get_stub(TestReportServiceStub).CreateTestStep(request) grpc_test_step = cast("CreateTestStepResponse", response).test_step return TestStep._from_proto(grpc_test_step) @@ -296,20 +377,42 @@ async def list_all_test_steps( page_size=page_size, ) - async def update_test_step(self, update: TestStepUpdate) -> TestStep: + async def update_test_step( + self, + update: TestStepUpdate | None = None, + *, + request: UpdateTestStepRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, + existing: TestStep | None = None, + ) -> TestStep: """Update an existing test step. Args: - update: The updates to apply. + update: The updates to apply. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``update``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. + existing: The full existing TestStep for simulation merge. If not provided, + the simulated response will only contain the updated fields. Returns: The updated TestStep. """ - test_step_proto, field_mask = update.to_proto_with_mask() - has_error_info = test_step_proto.HasField("error_info") - if has_error_info: - field_mask.paths.append("error_info") - request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_step_proto, field_mask = update.to_proto_with_mask() + has_error_info = test_step_proto.HasField("error_info") + if has_error_info: + field_mask.paths.append("error_info") + request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) + + if simulate or log_file is not None: + if log_file is not None: + self._log_request_to_file(log_file, "UpdateTestStep", request) + return self.simulate_update_test_step_response(request, existing=existing) + response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestStep(request) grpc_test_step = cast("UpdateTestStepResponse", response).test_step return TestStep._from_proto(grpc_test_step) @@ -332,17 +435,40 @@ async def delete_test_step(self, test_step_id: str) -> None: # Test Measurements async def create_test_measurement( - self, test_measurement: TestMeasurementCreate + self, + test_measurement: TestMeasurementCreate | None = None, + *, + request: CreateTestMeasurementRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, ) -> TestMeasurement: """Create a new test measurement. Args: - test_measurement: The test measurement to create. + test_measurement: The test measurement to create. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``test_measurement``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. Returns: The created TestMeasurement. """ - request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) + if request is None: + if test_measurement is None: + raise ValueError("Either test_measurement or request must be provided") + request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) + + if simulate or log_file is not None: + simulated_proto = self.simulate_create_test_measurement_response(request) + if log_file is not None: + self._log_request_to_file( + log_file, + "CreateTestMeasurement", + request, + response_id=simulated_proto.measurement_id, + ) + return TestMeasurement._from_proto(simulated_proto) + response = await self._grpc_client.get_stub(TestReportServiceStub).CreateTestMeasurement( request ) @@ -350,18 +476,41 @@ async def create_test_measurement( return TestMeasurement._from_proto(grpc_test_measurement) async def create_test_measurements( - self, test_measurements: list[TestMeasurementCreate] + self, + test_measurements: list[TestMeasurementCreate] | None = None, + *, + request: CreateTestMeasurementsRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, ) -> tuple[int, list[str]]: """Create multiple test measurements in a single request. Args: - test_measurements: The test measurements to create. + test_measurements: The test measurements to create. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``test_measurements``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. Returns: A tuple of (measurements_created_count, measurement_ids). """ - measurement_protos = [tm.to_proto() for tm in test_measurements] - request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) + if request is None: + if test_measurements is None: + raise ValueError("Either test_measurements or request must be provided") + measurement_protos = [tm.to_proto() for tm in test_measurements] + request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) + + if simulate or log_file is not None: + count, measurement_ids = self.simulate_create_test_measurements_response(request) + if log_file is not None: + self._log_request_to_file( + log_file, + "CreateTestMeasurements", + request, + response_id=",".join(measurement_ids), + ) + return count, measurement_ids + response = await self._grpc_client.get_stub(TestReportServiceStub).CreateTestMeasurements( request ) @@ -432,19 +581,41 @@ async def list_all_test_measurements( page_size=page_size, ) - async def update_test_measurement(self, update: TestMeasurementUpdate) -> TestMeasurement: + async def update_test_measurement( + self, + update: TestMeasurementUpdate | None = None, + *, + request: UpdateTestMeasurementRequest | None = None, + log_file: str | Path | None = None, + simulate: bool = False, + existing: TestMeasurement | None = None, + ) -> TestMeasurement: """Update an existing test measurement. Args: - update: The updates to apply. + update: The updates to apply. Mutually exclusive with ``request``. + request: A raw proto request. Mutually exclusive with ``update``. + log_file: If set, log the request to this file and return a simulated response. + simulate: If True, return a simulated response without making an API call. + existing: The full existing TestMeasurement for simulation merge. If not provided, + the simulated response will only contain the updated fields. Returns: The updated TestMeasurement. """ - test_measurement_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestMeasurementRequest( - test_measurement=test_measurement_proto, update_mask=field_mask - ) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_measurement_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestMeasurementRequest( + test_measurement=test_measurement_proto, update_mask=field_mask + ) + + if simulate or log_file is not None: + if log_file is not None: + self._log_request_to_file(log_file, "UpdateTestMeasurement", request) + return self.simulate_update_test_measurement_response(request, existing=existing) + response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestMeasurement( request ) @@ -465,3 +636,678 @@ async def delete_test_measurement(self, measurement_id: str) -> None: request = DeleteTestMeasurementRequest(measurement_id=measurement_id) await self._grpc_client.get_stub(TestReportServiceStub).DeleteTestMeasurement(request) + + # ------------------------------------------------------------------ + # Logging support methods + # ------------------------------------------------------------------ + + @staticmethod + def _log_request_to_file( + log_file: str | Path, + request_type: str, + request: Any, + response_id: str | None = None, + ) -> None: + """Log a request to a file in JSON format. + + Line 0 of the file is always a LogTracking header. Data lines start at line 1. + + Args: + log_file: Path to the log file. + request_type: Type of request being logged. + request: The protobuf request to log. + response_id: Optional ID from the simulated response, embedded in the tag + for create operations so replay can map previously simulated IDs used by simulated updates. + """ + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + if not log_path.exists() or log_path.stat().st_size == 0: + with open(log_path, "w") as f: + fcntl.flock(f, fcntl.LOCK_EX) + f.write(LogTracking().to_log_line()) + tag = f"{request_type}:{response_id}" if response_id else request_type + with open(log_path, "a") as f: + fcntl.flock(f, fcntl.LOCK_EX) + request_dict = json_format.MessageToDict(request) + request_json = json.dumps(request_dict, separators=(",", ":")) + f.write(f"[{tag}] {request_json}\n") + + @staticmethod + def _update_tracking(log_file: str | Path, tracking: LogTracking) -> None: + """Rewrite the LogTracking header (line 0) in place.""" + log_path = Path(log_file) + with open(log_path, "r+") as f: + fcntl.flock(f, fcntl.LOCK_EX) + lines = f.readlines() + lines[0] = tracking.to_log_line() + f.seek(0) + f.writelines(lines) + f.truncate() + + @staticmethod + def simulate_create_test_report_response( + request: CreateTestReportRequest, + ) -> TestReportProto: + """Simulate a CreateTestReport response by constructing a TestReportProto from the request. + + Args: + request: The CreateTestReportRequest. + + Returns: + A simulated TestReportProto. + """ + proto = TestReportProto( + test_report_id=str(uuid.uuid4()), + status=request.status, + name=request.name, + test_system_name=request.test_system_name, + test_case=request.test_case, + serial_number=request.serial_number, + part_number=request.part_number, + system_operator=request.system_operator, + run_id=request.run_id, + is_archived=False, + ) + proto.start_time.CopyFrom(request.start_time) + proto.end_time.CopyFrom(request.end_time) + proto.metadata.extend(request.metadata) + return proto + + @staticmethod + def simulate_update_test_report_response( + request: UpdateTestReportRequest, + existing: TestReport | None = None, + ) -> TestReport: + """Simulate an UpdateTestReport response. + + Args: + request: The UpdateTestReportRequest containing the updates. + existing: Optional existing TestReport to merge updates into. + If not provided, returns a TestReport from the request proto. + + Returns: + An updated TestReport with the specified fields modified. + """ + if existing is None: + return TestReport._from_proto(request.test_report) + + from datetime import timezone + + from sift_client.sift_types.test_report import TestStatus + from sift_client.util.metadata import metadata_proto_to_dict + + update_mask_paths = set(request.update_mask.paths) + proto = request.test_report + updates: dict[str, Any] = {} + + if "name" in update_mask_paths: + updates["name"] = proto.name + if "test_system_name" in update_mask_paths: + updates["test_system_name"] = proto.test_system_name + if "test_case" in update_mask_paths: + updates["test_case"] = proto.test_case + if "status" in update_mask_paths: + updates["status"] = TestStatus(proto.status) + if "start_time" in update_mask_paths: + updates["start_time"] = proto.start_time.ToDatetime(tzinfo=timezone.utc) + if "end_time" in update_mask_paths: + updates["end_time"] = proto.end_time.ToDatetime(tzinfo=timezone.utc) + if "serial_number" in update_mask_paths: + updates["serial_number"] = proto.serial_number if proto.serial_number else None + if "part_number" in update_mask_paths: + updates["part_number"] = proto.part_number if proto.part_number else None + if "system_operator" in update_mask_paths: + updates["system_operator"] = proto.system_operator if proto.system_operator else None + if "run_id" in update_mask_paths: + updates["run_id"] = proto.run_id if proto.run_id else None + if "metadata" in update_mask_paths: + updates["metadata"] = metadata_proto_to_dict(proto.metadata) # type: ignore + if "is_archived" in update_mask_paths: + updates["is_archived"] = proto.is_archived + + return existing.model_copy(update=updates) + + @staticmethod + def simulate_create_test_step_response( + request: CreateTestStepRequest, + ) -> TestStepProto: + """Simulate a CreateTestStep response by constructing a TestStepProto from the request. + + Args: + request: The CreateTestStepRequest. + + Returns: + A simulated TestStepProto. + """ + proto = TestStepProto() + proto.CopyFrom(request.test_step) + proto.test_step_id = str(uuid.uuid4()) + return proto + + @staticmethod + def simulate_update_test_step_response( + request: UpdateTestStepRequest, + existing: TestStep | None = None, + ) -> TestStep: + """Simulate an UpdateTestStep response. + + Args: + request: The UpdateTestStepRequest containing the updates. + existing: Optional existing TestStep to merge updates into. + If not provided, returns a TestStep from the request proto. + + Returns: + An updated TestStep with the specified fields modified. + """ + if existing is None: + return TestStep._from_proto(request.test_step) + + from datetime import timezone + + from sift_client.sift_types.test_report import ErrorInfo, TestStatus + + update_mask_paths = set(request.update_mask.paths) + proto = request.test_step + updates: dict[str, Any] = {} + + if "name" in update_mask_paths: + updates["name"] = proto.name + if "status" in update_mask_paths: + updates["status"] = TestStatus(proto.status) + if "start_time" in update_mask_paths: + updates["start_time"] = proto.start_time.ToDatetime(tzinfo=timezone.utc) + if "end_time" in update_mask_paths: + updates["end_time"] = proto.end_time.ToDatetime(tzinfo=timezone.utc) + if "description" in update_mask_paths: + updates["description"] = proto.description if proto.description else None + if "error_info" in update_mask_paths: + if proto.HasField("error_info"): + updates["error_info"] = ErrorInfo( + error_code=proto.error_info.error_code, + error_message=proto.error_info.error_message, + ) + else: + updates["error_info"] = None + + return existing.model_copy(update=updates) + + @staticmethod + def simulate_create_test_measurement_response( + request: CreateTestMeasurementRequest, + ) -> TestMeasurementProto: + """Simulate a CreateTestMeasurement response by constructing a TestMeasurementProto from the request. + + Args: + request: The CreateTestMeasurementRequest. + + Returns: + A simulated TestMeasurementProto. + """ + proto = TestMeasurementProto() + proto.CopyFrom(request.test_measurement) + proto.measurement_id = str(uuid.uuid4()) + return proto + + @staticmethod + def simulate_create_test_measurements_response( + request: CreateTestMeasurementsRequest, + ) -> tuple[int, list[str]]: + """Simulate a CreateTestMeasurements response. + + Args: + request: The CreateTestMeasurementsRequest. + + Returns: + A tuple of (measurements_created_count, measurement_ids). + """ + measurement_ids = [str(uuid.uuid4()) for _ in request.test_measurements] + return len(measurement_ids), measurement_ids + + @staticmethod + def simulate_update_test_measurement_response( + request: UpdateTestMeasurementRequest, + existing: TestMeasurement | None = None, + ) -> TestMeasurement: + """Simulate an UpdateTestMeasurement response. + + Args: + request: The UpdateTestMeasurementRequest containing the updates. + existing: Optional existing TestMeasurement to merge updates into. + If not provided, returns a TestMeasurement from the request proto. + + Returns: + An updated TestMeasurement with the specified fields modified. + """ + if existing is None: + return TestMeasurement._from_proto(request.test_measurement) + + from datetime import timezone + + from sift_client.sift_types.test_report import NumericBounds, TestMeasurementType + + update_mask_paths = set(request.update_mask.paths) + proto = request.test_measurement + updates: dict[str, Any] = {} + + if "name" in update_mask_paths: + updates["name"] = proto.name + if "passed" in update_mask_paths: + updates["passed"] = proto.passed + if "timestamp" in update_mask_paths: + updates["timestamp"] = proto.timestamp.ToDatetime(tzinfo=timezone.utc) + if "measurement_type" in update_mask_paths: + updates["measurement_type"] = TestMeasurementType(proto.measurement_type) + if "numeric_value" in update_mask_paths: + updates["numeric_value"] = ( + proto.numeric_value if proto.HasField("numeric_value") else None + ) + if "string_value" in update_mask_paths: + updates["string_value"] = proto.string_value if proto.HasField("string_value") else None + if "boolean_value" in update_mask_paths: + updates["boolean_value"] = ( + proto.boolean_value if proto.HasField("boolean_value") else None + ) + if "unit" in update_mask_paths: + updates["unit"] = proto.unit.abbreviated_name if proto.HasField("unit") else None + if "numeric_bounds" in update_mask_paths: + if proto.HasField("numeric_bounds"): + updates["numeric_bounds"] = NumericBounds( + min=proto.numeric_bounds.min if proto.numeric_bounds.HasField("min") else None, + max=proto.numeric_bounds.max if proto.numeric_bounds.HasField("max") else None, + ) + else: + updates["numeric_bounds"] = None + if "string_bounds" in update_mask_paths: + if proto.HasField("string_bounds"): + updates["string_expected_value"] = proto.string_bounds.expected_value + else: + updates["string_expected_value"] = None + + return existing.model_copy(update=updates) + + async def replay_log_file( + self, + log_file: str | Path, + *, + incremental: bool = False, + ) -> ReplayResult: + """Replay a log file, creating real API objects from the logged simulation data. + + Two modes are available: + + * **batch** (default): Parse the entire log, reconstruct objects via + simulation, then create them all via the API in one pass. The + ``LogTracking`` header on line 0 is ignored. + * **incremental** (``incremental=True``): Walk the log line-by-line, + issuing the real API call for each entry as it is encountered. + ``LogTracking.last_uploaded_line`` is updated after every successful + call so that a subsequent invocation skips already-uploaded lines. + + Args: + log_file: Path to the log file to replay. + incremental: If True, use incremental mode. + + Returns: + A ReplayResult containing the created report, steps, and measurements. + """ + log_path = Path(log_file) + if not log_path.exists(): + raise FileNotFoundError(f"Log file not found: {log_file}") + + if incremental: + return await self._replay_log_file_incremental(log_path) + return await self._replay_log_file_batch(log_path) + + # ------------------------------------------------------------------ + # Shared replay dispatch + # ------------------------------------------------------------------ + + async def _replay_entry( + self, + request_type: str, + response_id: str | None, + json_str: str, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + """Process a single log entry, updating *state* in place. + + When *simulate* is True the create/update methods return simulated + responses (no network call). When False they issue real gRPC calls. + *id_map* is updated so that subsequent entries can remap IDs that + were generated during the original simulation run. + """ + + def _map_id(sid: str) -> str: + return id_map.get(sid, sid) + + if request_type == "CreateTestReport": + create_report_req = CreateTestReportRequest() + json_format.Parse(json_str, create_report_req) + state.report = await self.create_test_report( + request=create_report_req, simulate=simulate + ) + if response_id: + id_map[response_id] = state.report._id_or_error + + elif request_type == "CreateTestStep": + create_step_req = CreateTestStepRequest() + json_format.Parse(json_str, create_step_req) + create_step_req.test_step.test_report_id = _map_id( + create_step_req.test_step.test_report_id + ) + if create_step_req.test_step.parent_step_id: + create_step_req.test_step.parent_step_id = _map_id( + create_step_req.test_step.parent_step_id + ) + step = await self.create_test_step(request=create_step_req, simulate=simulate) + if response_id: + id_map[response_id] = step._id_or_error + state.steps_by_id[step._id_or_error] = step + state.steps_order.append(step._id_or_error) + + elif request_type == "CreateTestMeasurement": + create_meas_req = CreateTestMeasurementRequest() + json_format.Parse(json_str, create_meas_req) + create_meas_req.test_measurement.test_step_id = _map_id( + create_meas_req.test_measurement.test_step_id + ) + measurement = await self.create_test_measurement( + request=create_meas_req, simulate=simulate + ) + if response_id: + id_map[response_id] = measurement._id_or_error + state.measurements_by_id[measurement._id_or_error] = measurement + state.measurements_order.append(measurement._id_or_error) + + elif request_type == "CreateTestMeasurements": + create_batch_req = CreateTestMeasurementsRequest() + json_format.Parse(json_str, create_batch_req) + for tm in create_batch_req.test_measurements: + tm.test_step_id = _map_id(tm.test_step_id) + original_ids = response_id.split(",") if response_id else [] + if simulate: + for i, tm_proto in enumerate(create_batch_req.test_measurements): + single_req = CreateTestMeasurementRequest(test_measurement=tm_proto) + meas = await self.create_test_measurement(request=single_req, simulate=True) + if i < len(original_ids): + id_map[original_ids[i]] = meas._id_or_error + state.measurements_by_id[meas._id_or_error] = meas + state.measurements_order.append(meas._id_or_error) + else: + _, real_ids = await self.create_test_measurements(request=create_batch_req) + for i, real_id in enumerate(real_ids): + if i < len(original_ids): + id_map[original_ids[i]] = real_id + + elif request_type == "UpdateTestReport": + if state.report is None: + raise ValueError("UpdateTestReport found before CreateTestReport") + update_report_req = UpdateTestReportRequest() + json_format.Parse(json_str, update_report_req) + update_report_req.test_report.test_report_id = _map_id( + update_report_req.test_report.test_report_id + ) + state.report = await self.update_test_report( + request=update_report_req, simulate=simulate, existing=state.report + ) + + elif request_type == "UpdateTestStep": + update_step_req = UpdateTestStepRequest() + json_format.Parse(json_str, update_step_req) + orig_step_id = update_step_req.test_step.test_step_id + mapped_step_id = _map_id(orig_step_id) + update_step_req.test_step.test_step_id = mapped_step_id + existing_step = state.steps_by_id.get(mapped_step_id) + if simulate and existing_step is None: + raise ValueError(f"UpdateTestStep for unknown step: {orig_step_id}") + updated_step = await self.update_test_step( + request=update_step_req, simulate=simulate, existing=existing_step + ) + if mapped_step_id in state.steps_by_id: + state.steps_by_id[mapped_step_id] = updated_step + + elif request_type == "UpdateTestMeasurement": + update_meas_req = UpdateTestMeasurementRequest() + json_format.Parse(json_str, update_meas_req) + orig_meas_id = update_meas_req.test_measurement.measurement_id + mapped_meas_id = _map_id(orig_meas_id) + update_meas_req.test_measurement.measurement_id = mapped_meas_id + existing_meas = state.measurements_by_id.get(mapped_meas_id) + if simulate and existing_meas is None: + raise ValueError(f"UpdateTestMeasurement for unknown measurement: {orig_meas_id}") + updated_meas = await self.update_test_measurement( + request=update_meas_req, simulate=simulate, existing=existing_meas + ) + if mapped_meas_id in state.measurements_by_id: + state.measurements_by_id[mapped_meas_id] = updated_meas + + # ------------------------------------------------------------------ + # Batch replay (default) + # ------------------------------------------------------------------ + + async def _replay_log_file_batch(self, log_path: Path) -> ReplayResult: + id_map: dict[str, str] = {} + state = _ReplayState() + + for request_type, response_id, json_str in self._iter_log_data_lines(log_path): + await self._replay_entry( + request_type, + response_id, + json_str, + simulate=True, + id_map=id_map, + state=state, + ) + + if state.report is None: + raise ValueError("No CreateTestReport found in log file") + + real_id_map: dict[str, str] = {} + + real_report = await self._create_report_from_simulated(state.report) + real_report_id = real_report._id_or_error + + real_steps: list[TestStep] = [] + for sim_step_id in state.steps_order: + sim_step = state.steps_by_id[sim_step_id] + real_parent_step_id = ( + real_id_map.get(sim_step.parent_step_id, sim_step.parent_step_id) + if sim_step.parent_step_id + else None + ) + step_create = self._step_create_from_simulated( + sim_step, real_report_id, real_parent_step_id + ) + real_step = await self.create_test_step(step_create) + real_steps.append(real_step) + real_id_map[sim_step_id] = real_step._id_or_error + + real_measurements: list[TestMeasurement] = [] + for sim_measurement_id in state.measurements_order: + sim_measurement = state.measurements_by_id[sim_measurement_id] + real_step_id = real_id_map.get( + sim_measurement.test_step_id, sim_measurement.test_step_id + ) + measurement_create = self._measurement_create_from_simulated( + sim_measurement, real_step_id + ) + real_measurement = await self.create_test_measurement(measurement_create) + real_measurements.append(real_measurement) + + return ReplayResult( + report=real_report, + steps=real_steps, + measurements=real_measurements, + ) + + # ------------------------------------------------------------------ + # Incremental replay + # ------------------------------------------------------------------ + + async def _replay_log_file_incremental(self, log_path: Path) -> ReplayResult: + """Replay line-by-line, issuing real API calls and updating tracking.""" + with open(log_path) as f: + first_line = f.readline() + tracking = LogTracking.from_log_line(first_line) if first_line else LogTracking() + + id_map = tracking.id_map + state = _ReplayState() + + for line_num, (request_type, response_id, json_str) in enumerate( + self._iter_log_data_lines(log_path), start=tracking.last_uploaded_line + 1 + ): + await self._replay_entry( + request_type, + response_id, + json_str, + simulate=False, + id_map=id_map, + state=state, + ) + + tracking.last_uploaded_line = line_num + self._update_tracking(log_path, tracking) + + if state.report is None: + raise ValueError("No CreateTestReport found in log file") + + return ReplayResult( + report=state.report, + steps=[state.steps_by_id[sid] for sid in state.steps_order], + measurements=[state.measurements_by_id[mid] for mid in state.measurements_order], + ) + + # ------------------------------------------------------------------ + # Log line parsing helpers + # ------------------------------------------------------------------ + + @staticmethod + def _iter_log_data_lines( + log_path: Path, + ) -> Generator[tuple[str, str | None, str], None, None]: + """Parse data lines from a log file, skipping the LogTracking header. + + Yields (request_type, response_id, json_str) tuples. + The file is read entirely under a shared lock and then released + before yielding, so callers can safely acquire exclusive locks + during iteration (e.g. ``_update_tracking``). + """ + line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") + with open(log_path) as f: + fcntl.flock(f, fcntl.LOCK_SH) + raw_lines = f.readlines() + + for raw_line in raw_lines: + line = raw_line.strip() + if not line: + continue + match = line_pattern.match(line) + if not match: + continue + request_type = match.group(1) + if request_type == "LogTracking": + continue + yield (request_type, match.group(2), match.group(3)) + + async def _create_report_from_simulated(self, simulated: TestReport) -> TestReport: + """Create a real test report from a simulated one.""" + report_create = TestReportCreate( + name=simulated.name, + test_system_name=simulated.test_system_name, + test_case=simulated.test_case, + start_time=simulated.start_time, + end_time=simulated.end_time, + status=simulated.status, + serial_number=simulated.serial_number, + part_number=simulated.part_number, + system_operator=simulated.system_operator, + run_id=simulated.run_id, + metadata=simulated.metadata, + ) + return await self.create_test_report(test_report=report_create) + + @staticmethod + def _step_create_from_simulated( + simulated: TestStep, + real_report_id: str, + real_parent_step_id: str | None, + ) -> TestStepCreate: + """Create a TestStepCreate from a simulated TestStep with updated IDs.""" + return TestStepCreate( + test_report_id=real_report_id, + name=simulated.name, + step_type=simulated.step_type, + step_path=simulated.step_path, + status=simulated.status, + start_time=simulated.start_time, + end_time=simulated.end_time, + parent_step_id=real_parent_step_id, + description=simulated.description, + error_info=simulated.error_info, + ) + + @staticmethod + def _measurement_create_from_simulated( + simulated: TestMeasurement, + real_step_id: str, + ) -> TestMeasurementCreate: + """Create a TestMeasurementCreate from a simulated TestMeasurement with updated step ID.""" + return TestMeasurementCreate( + name=simulated.name, + test_step_id=real_step_id, + passed=simulated.passed, + timestamp=simulated.timestamp, + measurement_type=simulated.measurement_type, + numeric_value=simulated.numeric_value, + string_value=simulated.string_value, + boolean_value=simulated.boolean_value, + unit=simulated.unit, + numeric_bounds=simulated.numeric_bounds, + string_expected_value=simulated.string_expected_value, + ) + + +@dataclass +class LogTracking: + """Tracking metadata stored as line 0 of a log file.""" + + last_uploaded_line: int = 0 + id_map: dict[str, str] = field(default_factory=dict) + + def to_log_line(self) -> str: + data = {"lastUploadedLine": self.last_uploaded_line, "idMap": self.id_map} + return f"[LogTracking] {json.dumps(data, separators=(',', ':'))}\n" + + @staticmethod + def from_log_line(line: str) -> LogTracking: + match = re.match(r"^\[LogTracking\]\s*(.+)$", line.strip()) + if not match: + return LogTracking() + data = json.loads(match.group(1)) + return LogTracking( + last_uploaded_line=data.get("lastUploadedLine", 0), + id_map=data.get("idMap", {}), + ) + + +@dataclass +class _ReplayState: + """Mutable state accumulated during log replay.""" + + report: TestReport | None = None + steps_by_id: dict[str, TestStep] = field(default_factory=dict) + steps_order: list[str] = field(default_factory=list) + measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) + measurements_order: list[str] = field(default_factory=list) + + +@dataclass +class ReplayResult: + """Result of replaying a log file.""" + + report: TestReport + steps: list[TestStep] = field(default_factory=list) + measurements: list[TestMeasurement] = field(default_factory=list) diff --git a/python/lib/sift_client/_internal/rest.py b/python/lib/sift_client/_internal/rest.py index 5f5c954c3..ee0239b79 100644 --- a/python/lib/sift_client/_internal/rest.py +++ b/python/lib/sift_client/_internal/rest.py @@ -6,7 +6,7 @@ from typing_extensions import NotRequired from urllib3.util import Retry -from sift_client._internal.grpc.transport import _clean_uri +from sift_client._internal.grpc_transport.transport import _clean_uri _DEFAULT_REST_RETRY = Retry(total=3, status_forcelist=[500, 502, 503, 504], backoff_factor=1) diff --git a/python/lib/sift_client/_tests/resources/test_test_results.py b/python/lib/sift_client/_tests/resources/test_test_results.py index 64d575be9..01a81ed1a 100644 --- a/python/lib/sift_client/_tests/resources/test_test_results.py +++ b/python/lib/sift_client/_tests/resources/test_test_results.py @@ -28,6 +28,42 @@ pytestmark = pytest.mark.integration +def compare_test_report_fields(simulated: TestReport, actual: TestReport) -> None: + """Compare simulated and actual TestReport fields (excluding id_).""" + assert simulated.status == actual.status + assert simulated.name == actual.name + assert simulated.test_system_name == actual.test_system_name + assert simulated.test_case == actual.test_case + assert simulated.serial_number == actual.serial_number + assert simulated.part_number == actual.part_number + assert simulated.system_operator == actual.system_operator + assert simulated.start_time == actual.start_time + assert simulated.end_time == actual.end_time + assert simulated.metadata == actual.metadata + + +def compare_test_step_fields(simulated: TestStep, actual: TestStep) -> None: + """Compare simulated and actual TestStep fields (excluding id_).""" + assert simulated.name == actual.name + assert simulated.description == actual.description + assert simulated.step_type == actual.step_type + assert simulated.step_path == actual.step_path + assert simulated.status == actual.status + assert simulated.start_time == actual.start_time + assert simulated.end_time == actual.end_time + + +def compare_test_measurement_fields(simulated: TestMeasurement, actual: TestMeasurement) -> None: + """Compare simulated and actual TestMeasurement fields (excluding id_).""" + assert simulated.name == actual.name + assert simulated.measurement_type == actual.measurement_type + assert simulated.numeric_value == actual.numeric_value + assert simulated.string_value == actual.string_value + assert simulated.boolean_value == actual.boolean_value + assert simulated.passed == actual.passed + assert simulated.timestamp == actual.timestamp + + def test_client_binding(sift_client): assert sift_client.test_results assert isinstance(sift_client.test_results, TestResultsAPI) @@ -40,105 +76,142 @@ class TestResultsTest: test_steps: ClassVar[dict[str, TestStep]] = {} test_measurements: ClassVar[dict[str, TestMeasurement]] = {} - def test_create_test_report(self, sift_client, nostromo_run): + def test_create_test_report(self, sift_client, nostromo_run, tmp_path): # Create a test report simulated_time = datetime.now(timezone.utc) - test_report = sift_client.test_results.create( - { - "status": TestStatus.PASSED, - "name": "Test Report with Steps and Measurements", - "test_system_name": "Test System", - "test_case": "Test Case", - "serial_number": str(uuid.uuid4()), - "part_number": "1234567890", - "start_time": simulated_time, - "end_time": simulated_time, - "run_id": nostromo_run.id_, - }, - ) + report_data = { + "status": TestStatus.PASSED, + "name": "Test Report with Steps and Measurements", + "test_system_name": "Test System", + "test_case": "Test Case", + "serial_number": str(uuid.uuid4()), + "part_number": "1234567890", + "start_time": simulated_time, + "end_time": simulated_time, + "run_id": nostromo_run.id_, + } + + # First, create with log_file to get simulated response + log_file = tmp_path / "test_log.jsonl" + simulated_report = sift_client.test_results.create(report_data, log_file=log_file) + + # Verify log file was created and contains content + assert log_file.exists() + log_content = log_file.read_text() + assert "[CreateTestReport:" in log_content + + # Verify simulated report has an id and expected fields + assert simulated_report.id_ is not None + + # Now create the real report + test_report = sift_client.test_results.create(report_data) + + # Compare simulated vs actual (fields should match except for id_) + compare_test_report_fields(simulated_report, test_report) + assert test_report.id_ is not None assert test_report.run_id == nostromo_run.id_ self.test_reports["basic_test_report"] = test_report - def test_create_test_steps(self, sift_client): + def test_create_test_steps(self, sift_client, tmp_path): test_report = self.test_reports.get("basic_test_report") if not test_report: pytest.skip("Need to create a test report first") simulated_time = test_report.start_time - - # Create multiple test steps using TestStepCreate - step1 = sift_client.test_results.create_step( - TestStepCreate( - test_report_id=test_report.id_, - name="Step 1: Initialization", - description="Initialize the test environment", - step_type=TestStepType.ACTION, - step_path="1", - status=TestStatus.PASSED, - start_time=simulated_time, - end_time=simulated_time + timedelta(seconds=10), - ), + log_file = tmp_path / "test_steps_log.jsonl" + + # Test step 1 with log_file comparison + step1_data = TestStepCreate( + test_report_id=test_report.id_, + name="Step 1: Initialization", + description="Initialize the test environment", + step_type=TestStepType.ACTION, + step_path="1", + status=TestStatus.PASSED, + start_time=simulated_time, + end_time=simulated_time + timedelta(seconds=10), ) + + # Create simulated step first + simulated_step1 = sift_client.test_results.create_step(step1_data, log_file=log_file) + assert simulated_step1.id_ is not None + assert log_file.exists() + assert "[CreateTestStep:" in log_file.read_text() + + # Create actual step + step1 = sift_client.test_results.create_step(step1_data) + compare_test_step_fields(simulated_step1, step1) + simulated_time = simulated_time + timedelta(seconds=10.1) - # Create a step using a dict - step1_1 = sift_client.test_results.create_step( - { - "test_report_id": test_report.id_, - "parent_step_id": step1.id_, - "name": "Step 1.1: Substep 1", - "description": "Substep 1 of Step 1", - "step_type": TestStepType.ACTION, - "step_path": "1.1", - "status": TestStatus.PASSED, - "start_time": simulated_time, - "end_time": simulated_time + timedelta(seconds=10), - }, - ) + # Create a step using a dict - test log_file with dict input + step1_1_data = { + "test_report_id": test_report.id_, + "parent_step_id": step1.id_, + "name": "Step 1.1: Substep 1", + "description": "Substep 1 of Step 1", + "step_type": TestStepType.ACTION, + "step_path": "1.1", + "status": TestStatus.PASSED, + "start_time": simulated_time, + "end_time": simulated_time + timedelta(seconds=10), + } + simulated_step1_1 = sift_client.test_results.create_step(step1_1_data, log_file=log_file) + assert simulated_step1_1.id_ is not None + step1_1 = sift_client.test_results.create_step(step1_1_data) + compare_test_step_fields(simulated_step1_1, step1_1) + simulated_time = simulated_time + timedelta(seconds=10.1) - step2 = sift_client.test_results.create_step( - TestStepCreate( - test_report_id=test_report.id_, - name="Step 2: Data Collection", - description="Collect sensor data", - step_type=TestStepType.ACTION, - step_path="2", - status=TestStatus.PASSED, - start_time=simulated_time, - end_time=simulated_time + timedelta(seconds=10), - ) + step2_data = TestStepCreate( + test_report_id=test_report.id_, + name="Step 2: Data Collection", + description="Collect sensor data", + step_type=TestStepType.ACTION, + step_path="2", + status=TestStatus.PASSED, + start_time=simulated_time, + end_time=simulated_time + timedelta(seconds=10), ) + simulated_step2 = sift_client.test_results.create_step(step2_data, log_file=log_file) + assert simulated_step2.id_ is not None + step2 = sift_client.test_results.create_step(step2_data) + compare_test_step_fields(simulated_step2, step2) + simulated_time = simulated_time + timedelta(seconds=10.1) - step3 = sift_client.test_results.create_step( - TestStepCreate( - test_report_id=test_report.id_, - name="Step 3: Validation", - description="Validate collected data", - step_type=TestStepType.ACTION, - step_path="3", - status=TestStatus.IN_PROGRESS, - start_time=simulated_time, - end_time=simulated_time + timedelta(seconds=10), - ), + step3_data = TestStepCreate( + test_report_id=test_report.id_, + name="Step 3: Validation", + description="Validate collected data", + step_type=TestStepType.ACTION, + step_path="3", + status=TestStatus.IN_PROGRESS, + start_time=simulated_time, + end_time=simulated_time + timedelta(seconds=10), ) - - step3_1 = sift_client.test_results.create_step( - TestStepCreate( - test_report_id=test_report.id_, - parent_step_id=step3.id_, - name="Step 3.1: Substep 3.1", - description="Error demo", - step_type=TestStepType.ACTION, - step_path="3.1", - status=TestStatus.FAILED, - start_time=simulated_time, - end_time=simulated_time + timedelta(seconds=11), - error_info=ErrorInfo( - error_code=1, - error_message="Demo error message", - ), + simulated_step3 = sift_client.test_results.create_step(step3_data, log_file=log_file) + assert simulated_step3.id_ is not None + step3 = sift_client.test_results.create_step(step3_data) + compare_test_step_fields(simulated_step3, step3) + + step3_1_data = TestStepCreate( + test_report_id=test_report.id_, + parent_step_id=step3.id_, + name="Step 3.1: Substep 3.1", + description="Error demo", + step_type=TestStepType.ACTION, + step_path="3.1", + status=TestStatus.FAILED, + start_time=simulated_time, + end_time=simulated_time + timedelta(seconds=11), + error_info=ErrorInfo( + error_code=1, + error_message="Demo error message", ), ) + simulated_step3_1 = sift_client.test_results.create_step(step3_1_data, log_file=log_file) + assert simulated_step3_1.id_ is not None + step3_1 = sift_client.test_results.create_step(step3_1_data) + compare_test_step_fields(simulated_step3_1, step3_1) assert step1.id_ is not None assert step1_1.id_ is not None assert step2.id_ is not None @@ -150,15 +223,32 @@ def test_create_test_steps(self, sift_client): self.test_steps["step3"] = step3 self.test_steps["step3_1"] = step3_1 - def test_update_test_steps(self, sift_client): + def test_update_test_steps(self, sift_client, tmp_path): step3 = self.test_steps.get("step3") step3_1 = self.test_steps.get("step3_1") if not step3 or not step3_1: pytest.skip("Need to create a step first") + + log_file = tmp_path / "test_step_update_log.jsonl" + + # Test update with log_file first + simulated_step3 = sift_client.test_results.update_step( + step3, + {"status": TestStatus.PASSED}, + log_file=log_file, + ) + assert log_file.exists() + assert "[UpdateTestStep]" in log_file.read_text() + assert simulated_step3.status == TestStatus.PASSED + + # Now do real update step3 = sift_client.test_results.update_step( step3, {"status": TestStatus.PASSED}, ) + + compare_test_step_fields(simulated_step3, step3) + # Update the step using class function. step3_1 = step3_1.update( {"description": "Error demo w/ updated description"}, @@ -166,7 +256,7 @@ def test_update_test_steps(self, sift_client): assert step3.status == TestStatus.PASSED assert step3_1.description == "Error demo w/ updated description" - def test_create_test_measurements(self, sift_client): + def test_create_test_measurements(self, sift_client, tmp_path): step1 = self.test_steps.get("step1") step2 = self.test_steps.get("step2") step3 = self.test_steps.get("step3") @@ -174,23 +264,39 @@ def test_create_test_measurements(self, sift_client): if not step1 or not step2 or not step3 or not step1_1: pytest.skip("Need to create steps first") - # Create measurements for each step using TestMeasurementCreate - measurement1 = sift_client.test_results.create_measurement( - TestMeasurementCreate( - test_step_id=step1.id_, - name="Temperature Reading", - measurement_type=TestMeasurementType.DOUBLE, - numeric_value=25.5, - numeric_bounds=NumericBounds( - min=24, - max=26, - ), - unit="Celsius", - passed=True, - timestamp=step1.start_time, + log_file = tmp_path / "test_measurements_log.jsonl" + + # Test measurement creation with log_file comparison + measurement1_data = TestMeasurementCreate( + test_step_id=step1.id_, + name="Temperature Reading", + measurement_type=TestMeasurementType.DOUBLE, + numeric_value=25.5, + numeric_bounds=NumericBounds( + min=24, + max=26, ), + unit="Celsius", + passed=True, + timestamp=step1.start_time, + ) + + # Create simulated measurement first + simulated_measurement1 = sift_client.test_results.create_measurement( + measurement1_data, + update_step=True, + log_file=log_file, + ) + assert simulated_measurement1.id_ is not None + assert log_file.exists() + assert "[CreateTestMeasurement:" in log_file.read_text() + + # Create actual measurement + measurement1 = sift_client.test_results.create_measurement( + measurement1_data, update_step=True, ) + compare_test_measurement_fields(simulated_measurement1, measurement1) # Create a measurement using a dict measurement2 = sift_client.test_results.create_measurement( @@ -239,21 +345,40 @@ def test_create_test_measurements(self, sift_client): self.test_measurements["measurement3"] = measurement3 self.test_measurements["measurement4"] = measurement4 - def test_update_test_measurements(self, sift_client): + def test_update_test_measurements(self, sift_client, tmp_path): measurement2 = self.test_measurements.get("measurement2") measurement4 = self.test_measurements.get("measurement4") if not measurement2 or not measurement4: pytest.skip("Need to create measurements first") + log_file = tmp_path / "test_measurement_update_log.jsonl" + + update_data = { + "passed": False, + "string_expected_value": "1.10.4", + "unit": "C", + } + + # Test update with log_file first + simulated_measurement2 = sift_client.test_results.update_measurement( + measurement2, + update=update_data, + update_step=True, + log_file=log_file, + ) + assert log_file.exists() + assert "[UpdateTestMeasurement]" in log_file.read_text() + assert simulated_measurement2.passed == False + + # Now do real update measurement2 = sift_client.test_results.update_measurement( measurement2, - update={ - "passed": False, - "string_expected_value": "1.10.4", - "unit": "C", - }, + update=update_data, update_step=True, ) + + compare_test_measurement_fields(simulated_measurement2, measurement2) + assert measurement2.passed == False assert measurement2.string_expected_value == "1.10.4" assert measurement2.unit == "C" @@ -280,26 +405,41 @@ def test_update_test_measurements(self, sift_client): self.test_measurements["measurement2"] = measurement2 self.test_measurements["measurement4"] = measurement4 - def test_update_test_report(self, sift_client): + def test_update_test_report(self, sift_client, tmp_path): test_report = self.test_reports.get("basic_test_report") if not test_report: pytest.skip("Need to create a test report first") new_end_time = test_report.start_time + timedelta(seconds=42) - # Update the report with metadata + log_file = tmp_path / "test_report_update_log.jsonl" + + update_kwargs = { + "metadata": { + "test_environment": "production", + "temperature": 22.5, + "humidity": 45.0, + "automated": True, + }, + "end_time": new_end_time, + "run_id": "", + } + + # Test update with log_file first (create fresh update object) + simulated_report = sift_client.test_results.update( + test_report=test_report, + update=TestReportUpdate(**update_kwargs), + log_file=log_file, + ) + assert log_file.exists() + assert "[UpdateTestReport]" in log_file.read_text() + + # Update the report with metadata (real call, create fresh update object) updated_report = sift_client.test_results.update( test_report=test_report, - update=TestReportUpdate( - metadata={ - "test_environment": "production", - "temperature": 22.5, - "humidity": 45.0, - "automated": True, - }, - end_time=new_end_time, - run_id="", - ), + update=TestReportUpdate(**update_kwargs), ) + compare_test_report_fields(simulated_report, updated_report) + # Update the report using class function. updated_report = updated_report.update( {"status": TestStatus.FAILED}, @@ -395,6 +535,156 @@ def test_import_test_report(self, sift_client): assert found_report.id_ == test_report.id_ self.test_reports["imported_test_report"] = found_report + def test_replay_log_file_round_trip(self, sift_client, nostromo_run, tmp_path): + """Create a report with steps, nested steps, and measurements twice: + once with a log file and once without. Then replay the log and compare. + """ + t0 = datetime.now(timezone.utc) + log_file = tmp_path / "round_trip.jsonl" + + report_data = { + "status": TestStatus.IN_PROGRESS, + "name": "Round Trip Test Report", + "test_system_name": "RT System", + "test_case": "RT Case", + "serial_number": str(uuid.uuid4()), + "part_number": "RT-001", + "start_time": t0, + "end_time": t0 + timedelta(seconds=60), + "run_id": nostromo_run.id_, + } + + results: list[dict] = [] + + for iteration_log_file in [log_file, None]: + report = sift_client.test_results.create(report_data, log_file=iteration_log_file) + + step1 = sift_client.test_results.create_step( + TestStepCreate( + test_report_id=report.id_, + name="RT Step 1", + description="Top-level step", + step_type=TestStepType.SEQUENCE, + step_path="1", + status=TestStatus.PASSED, + start_time=t0, + end_time=t0 + timedelta(seconds=20), + ), + log_file=iteration_log_file, + ) + + step1_1 = sift_client.test_results.create_step( + TestStepCreate( + test_report_id=report.id_, + parent_step_id=step1.id_, + name="RT Step 1.1", + description="Nested step", + step_type=TestStepType.ACTION, + step_path="1.1", + status=TestStatus.PASSED, + start_time=t0, + end_time=t0 + timedelta(seconds=10), + ), + log_file=iteration_log_file, + ) + + step2 = sift_client.test_results.create_step( + TestStepCreate( + test_report_id=report.id_, + name="RT Step 2", + description="Another top-level step", + step_type=TestStepType.ACTION, + step_path="2", + status=TestStatus.IN_PROGRESS, + start_time=t0 + timedelta(seconds=20), + end_time=t0 + timedelta(seconds=40), + error_info=ErrorInfo(error_code=42, error_message="test error"), + ), + log_file=iteration_log_file, + ) + + m1 = sift_client.test_results.create_measurement( + TestMeasurementCreate( + test_step_id=step1_1.id_, + name="RT Temperature", + measurement_type=TestMeasurementType.DOUBLE, + numeric_value=98.6, + numeric_bounds=NumericBounds(min=97.0, max=100.0), + unit="F", + passed=True, + timestamp=t0 + timedelta(seconds=5), + ), + log_file=iteration_log_file, + ) + + m2 = sift_client.test_results.create_measurement( + TestMeasurementCreate( + test_step_id=step2.id_, + name="RT Status Flag", + measurement_type=TestMeasurementType.BOOLEAN, + boolean_value=False, + passed=False, + timestamp=t0 + timedelta(seconds=30), + ), + log_file=iteration_log_file, + ) + + step2 = sift_client.test_results.update_step( + step2, + {"status": TestStatus.FAILED}, + log_file=iteration_log_file, + ) + + report = sift_client.test_results.update( + test_report=report, + update=TestReportUpdate(status=TestStatus.FAILED), + log_file=iteration_log_file, + ) + + results.append( + { + "report": report, + "steps": {"step1": step1, "step1_1": step1_1, "step2": step2}, + "measurements": {"m1": m1, "m2": m2}, + } + ) + + # Verify log file has all expected entries + log_content = log_file.read_text() + assert "[CreateTestReport:" in log_content + assert "[CreateTestStep:" in log_content + assert "[CreateTestMeasurement:" in log_content + assert "[UpdateTestStep]" in log_content + assert "[UpdateTestReport]" in log_content + + # Replay the log file to create real resources + replay_result = sift_client.test_results.replay_log_file(log_file) + + assert replay_result.report.id_ is not None + assert len(replay_result.steps) == 3 + assert len(replay_result.measurements) == 2 + + direct = results[1] + + # Report: updates should have been folded in before create + compare_test_report_fields(replay_result.report, direct["report"]) + + # Steps (matched by name) + replayed_steps_by_name = {s.name: s for s in replay_result.steps} + for direct_step in direct["steps"].values(): + replayed_step = replayed_steps_by_name[direct_step.name] + compare_test_step_fields(replayed_step, direct_step) + + # Measurements (matched by name) + replayed_measurements_by_name = {m.name: m for m in replay_result.measurements} + for direct_m in direct["measurements"].values(): + replayed_m = replayed_measurements_by_name[direct_m.name] + compare_test_measurement_fields(replayed_m, direct_m) + + # Clean up both reports + self.test_reports["replay_report"] = replay_result.report + self.test_reports["direct_report"] = direct["report"] + def test_delete_test_reports(self, sift_client): for test_report in self.test_reports.values(): sift_client.test_results.delete(test_report=test_report) diff --git a/python/lib/sift_client/_tests/sift_types/test_results.py b/python/lib/sift_client/_tests/sift_types/test_results.py index 16b633503..8caef1978 100644 --- a/python/lib/sift_client/_tests/sift_types/test_results.py +++ b/python/lib/sift_client/_tests/sift_types/test_results.py @@ -148,6 +148,7 @@ def test_update_test_measurement(self, mock_test_measurement, mock_client): test_measurement=mock_test_measurement, update=update, update_step=True, + log_file=None, ) mock_update.assert_called_once_with(updated_measurement) assert result is mock_test_measurement @@ -171,6 +172,7 @@ def test_update_test_report(self, mock_test_report, mock_client): mock_client.test_results.update.assert_called_once_with( test_report=mock_test_report, update=update, + log_file=None, ) mock_update.assert_called_once_with(updated_report) diff --git a/python/lib/sift_client/_tests/util/conftest.py b/python/lib/sift_client/_tests/util/conftest.py new file mode 100644 index 000000000..3d8eb07fc --- /dev/null +++ b/python/lib/sift_client/_tests/util/conftest.py @@ -0,0 +1,43 @@ +"""Override report_context to disable log file simulation for integration tests in this directory so that we can exercise the context manager when no log file is provided.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator + +import pytest + +from sift_client.util.test_results.pytest_util import _report_context_impl, _step_impl + +if TYPE_CHECKING: + from sift_client.client import SiftClient + from sift_client.util.test_results.context_manager import NewStep, ReportContext + + +@pytest.fixture(scope="session", autouse=True) +def report_context( + sift_client: SiftClient, client_has_connection: bool, request: pytest.FixtureRequest +) -> Generator[ReportContext | None, None, None]: + if client_has_connection: + yield from _report_context_impl(sift_client, request, log_file=None) + else: + yield None + + +@pytest.fixture(autouse=True) +def step( + report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest +) -> Generator[NewStep | None, None, None]: + if client_has_connection: + yield from _step_impl(report_context, request) + else: + yield None + + +@pytest.fixture(scope="module", autouse=True) +def module_substep( + report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest +) -> Generator[NewStep | None, None, None]: + if client_has_connection: + yield from _step_impl(report_context, request) + else: + yield None diff --git a/python/lib/sift_client/_tests/util/test_test_results_utils.py b/python/lib/sift_client/_tests/util/test_test_results_utils.py index f2a5cb13b..d569e6420 100644 --- a/python/lib/sift_client/_tests/util/test_test_results_utils.py +++ b/python/lib/sift_client/_tests/util/test_test_results_utils.py @@ -1,3 +1,4 @@ +import time from datetime import datetime, timezone import numpy as np @@ -16,11 +17,46 @@ assign_value_to_measurement, evaluate_measurement_bounds, ) -from sift_client.util.test_results.context_manager import NewStep +from sift_client.util.test_results.context_manager import NewStep, ReportContext pytestmark = pytest.mark.integration +class TestLogReplay: + """Test that the incremental replay subprocess creates real API objects.""" + + def test_replay_creates_report(self, sift_client): + unique_name = f"replay-test-{datetime.now(timezone.utc).isoformat()}" + + with ReportContext( + sift_client, + name=unique_name, + test_case="test_replay_creates_report", + log_file=True, + ) as rc: + with rc.new_step(name="Step A") as step_a: + with step_a.substep(name="Step A.1"): + pass + with rc.new_step(name="Step B"): + pass + + # Wait to ensure the report creation has completed. + time.sleep(2) + + reports = sift_client.test_results.list_(name=unique_name) + assert len(reports) >= 1, f"Expected report '{unique_name}' to be created by replay" + replay_report = reports[0] + assert replay_report.name == unique_name + + steps = sift_client.test_results.list_steps(test_reports=[replay_report]) + step_names = {s.name for s in steps} + assert "Step A" in step_names + assert "Step A.1" in step_names + assert "Step B" in step_names + + sift_client.test_results.delete(test_report=replay_report) + + class TestContextManager: def test_link_run_to_report(self, report_context, nostromo_run): report_context.report.update({"run_id": nostromo_run.id_}) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index fe87809cd..571562bea 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -13,6 +13,9 @@ if TYPE_CHECKING: import pandas as pd import pyarrow as pa + from sift_client._internal.low_level_wrappers.test_results import ( + ReplayResult, + ) from sift_client.client import SiftClient from sift_client.sift_types.asset import Asset, AssetUpdate from sift_client.sift_types.calculated_channel import ( @@ -1703,11 +1706,14 @@ class TestResultsAPI: """ ... - def create(self, test_report: TestReportCreate | dict) -> TestReport: + def create( + self, test_report: TestReportCreate | dict, log_file: str | Path | None = None + ) -> TestReport: """Create a new test report. Args: test_report: The test report to create (can be TestReport or TestReportCreate). + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestReport. @@ -1715,13 +1721,17 @@ class TestResultsAPI: ... def create_measurement( - self, test_measurement: TestMeasurementCreate | dict, update_step: bool = False + self, + test_measurement: TestMeasurementCreate | dict, + update_step: bool = False, + log_file: str | Path | None = None, ) -> TestMeasurement: """Create a new test measurement. Args: test_measurement: The test measurement to create (can be TestMeasurement or TestMeasurementCreate). update_step: Whether to update the step to failed if the measurement is being created is failed. + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestMeasurement. @@ -1729,23 +1739,27 @@ class TestResultsAPI: ... def create_measurements( - self, test_measurements: list[TestMeasurementCreate] + self, test_measurements: list[TestMeasurementCreate], log_file: str | Path | None = None ) -> tuple[int, list[str]]: """Create multiple test measurements in a single request. Args: test_measurements: The test measurements to create. + log_file: If set, log the request to this file and return a simulated response. Returns: A tuple of (measurements_created_count, measurement_ids). """ ... - def create_step(self, test_step: TestStepCreate | dict) -> TestStep: + def create_step( + self, test_step: TestStepCreate | dict, log_file: str | Path | None = None + ) -> TestStep: """Create a new test step. Args: test_step: The test step to create (can be TestStep or TestStepCreate). + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestStep. @@ -1949,6 +1963,26 @@ class TestResultsAPI: """ ... + def replay_log_file(self, log_file: str | Path, *, incremental: bool = False) -> ReplayResult: + """Replay a log file, creating real API objects from the logged simulation data. + + Two modes are available: + + * **batch** (default): Parse the entire log, reconstruct objects via + simulation, then create them all via the API in one pass. + * **incremental**: Walk the log line-by-line, issuing the real API call + for each entry. The ``LogTracking`` header is updated after every + successful call so a subsequent invocation picks up where it left off. + + Args: + log_file: Path to the log file to replay. + incremental: If True, use incremental mode. + + Returns: + A ReplayResult containing the created report, steps, and measurements. + """ + ... + def unarchive(self, *, test_report: str | TestReport) -> TestReport: """Unarchive a test report. @@ -1957,12 +1991,18 @@ class TestResultsAPI: """ ... - def update(self, test_report: str | TestReport, update: TestReportUpdate | dict) -> TestReport: + def update( + self, + test_report: str | TestReport, + update: TestReportUpdate | dict, + log_file: str | Path | None = None, + ) -> TestReport: """Update a TestReport. Args: test_report: The TestReport or test report ID to update. update: Updates to apply to the TestReport. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestReport. @@ -1974,6 +2014,7 @@ class TestResultsAPI: test_measurement: TestMeasurement, update: TestMeasurementUpdate | dict, update_step: bool = False, + log_file: str | Path | None = None, ) -> TestMeasurement: """Update a TestMeasurement. @@ -1981,18 +2022,25 @@ class TestResultsAPI: test_measurement: The TestMeasurement or measurement ID to update. update: Updates to apply to the TestMeasurement. update_step: Whether to update the step to failed if the measurement is being updated to failed. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestMeasurement. """ ... - def update_step(self, test_step: str | TestStep, update: TestStepUpdate | dict) -> TestStep: + def update_step( + self, + test_step: str | TestStep, + update: TestStepUpdate | dict, + log_file: str | Path | None = None, + ) -> TestStep: """Update a TestStep. Args: test_step: The TestStep or test step ID to update. update: Updates to apply to the TestStep. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestStep. diff --git a/python/lib/sift_client/resources/test_results.py b/python/lib/sift_client/resources/test_results.py index 5f3198d4d..4686a0251 100644 --- a/python/lib/sift_client/resources/test_results.py +++ b/python/lib/sift_client/resources/test_results.py @@ -4,7 +4,10 @@ from datetime import datetime from typing import TYPE_CHECKING, Any -from sift_client._internal.low_level_wrappers.test_results import TestResultsLowLevelClient +from sift_client._internal.low_level_wrappers.test_results import ( + ReplayResult, + TestResultsLowLevelClient, +) from sift_client._internal.low_level_wrappers.upload import UploadLowLevelClient from sift_client.resources._base import ResourceBase from sift_client.sift_types.test_report import ( @@ -66,11 +69,13 @@ async def import_(self, test_file: str | Path) -> TestReport: async def create( self, test_report: TestReportCreate | dict, + log_file: str | Path | None = None, ) -> TestReport: """Create a new test report. Args: test_report: The test report to create (can be TestReport or TestReportCreate). + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestReport. @@ -79,6 +84,7 @@ async def create( test_report = TestReportCreate.model_validate(test_report) created_report = await self._low_level_client.create_test_report( test_report=test_report, + log_file=log_file, ) return self._apply_client_to_instance(created_report) @@ -224,13 +230,17 @@ async def find(self, **kwargs) -> TestReport | None: return None async def update( - self, test_report: str | TestReport, update: TestReportUpdate | dict + self, + test_report: str | TestReport, + update: TestReportUpdate | dict, + log_file: str | Path | None = None, ) -> TestReport: """Update a TestReport. Args: test_report: The TestReport or test report ID to update. update: Updates to apply to the TestReport. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestReport. @@ -242,7 +252,10 @@ async def update( update = TestReportUpdate.model_validate(update) update.resource_id = test_report_id - updated_test_report = await self._low_level_client.update_test_report(update) + existing = test_report if isinstance(test_report, TestReport) else None + updated_test_report = await self._low_level_client.update_test_report( + update, log_file=log_file, existing=existing + ) return self._apply_client_to_instance(updated_test_report) async def archive(self, *, test_report: str | TestReport) -> TestReport: @@ -272,18 +285,25 @@ async def delete(self, *, test_report: str | TestReport) -> None: raise TypeError(f"test_report_id must be a string not {type(test_report_id)}") await self._low_level_client.delete_test_report(test_report_id=test_report_id) - async def create_step(self, test_step: TestStepCreate | dict) -> TestStep: + async def create_step( + self, + test_step: TestStepCreate | dict, + log_file: str | Path | None = None, + ) -> TestStep: """Create a new test step. Args: test_step: The test step to create (can be TestStep or TestStepCreate). + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestStep. """ if isinstance(test_step, dict): test_step = TestStepCreate.model_validate(test_step) - test_step_result = await self._low_level_client.create_test_step(test_step) + test_step_result = await self._low_level_client.create_test_step( + test_step, log_file=log_file + ) return self._apply_client_to_instance(test_step_result) async def list_steps( @@ -384,13 +404,17 @@ async def get_step(self, test_step: str | TestStep) -> TestStep: return self._apply_client_to_instance(test_step) async def update_step( - self, test_step: str | TestStep, update: TestStepUpdate | dict + self, + test_step: str | TestStep, + update: TestStepUpdate | dict, + log_file: str | Path | None = None, ) -> TestStep: """Update a TestStep. Args: test_step: The TestStep or test step ID to update. update: Updates to apply to the TestStep. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestStep. @@ -401,7 +425,10 @@ async def update_step( update = TestStepUpdate.model_validate(update) update.resource_id = test_step_id - updated_test_step = await self._low_level_client.update_test_step(update) + existing = test_step if isinstance(test_step, TestStep) else None + updated_test_step = await self._low_level_client.update_test_step( + update, log_file=log_file, existing=existing + ) return self._apply_client_to_instance(updated_test_step) async def delete_step(self, *, test_step: str | TestStep) -> None: @@ -416,13 +443,17 @@ async def delete_step(self, *, test_step: str | TestStep) -> None: await self._low_level_client.delete_test_step(test_step_id=test_step_id) async def create_measurement( - self, test_measurement: TestMeasurementCreate | dict, update_step: bool = False + self, + test_measurement: TestMeasurementCreate | dict, + update_step: bool = False, + log_file: str | Path | None = None, ) -> TestMeasurement: """Create a new test measurement. Args: test_measurement: The test measurement to create (can be TestMeasurement or TestMeasurementCreate). update_step: Whether to update the step to failed if the measurement is being created is failed. + log_file: If set, log the request to this file and return a simulated response. Returns: The created TestMeasurement. @@ -430,27 +461,32 @@ async def create_measurement( if isinstance(test_measurement, dict): test_measurement = TestMeasurementCreate.model_validate(test_measurement) test_measurement_result = await self._low_level_client.create_test_measurement( - test_measurement + test_measurement, log_file=log_file ) measurement = self._apply_client_to_instance(test_measurement_result) - if update_step: + if update_step and log_file is None: step = await self.get_step(test_step=test_measurement_result.test_step_id) if step.status == TestStatus.PASSED and not measurement.passed: await self.update_step(test_step=step, update={"status": TestStatus.FAILED}) return measurement async def create_measurements( - self, test_measurements: list[TestMeasurementCreate] + self, + test_measurements: list[TestMeasurementCreate], + log_file: str | Path | None = None, ) -> tuple[int, list[str]]: """Create multiple test measurements in a single request. Args: test_measurements: The test measurements to create. + log_file: If set, log the request to this file and return a simulated response. Returns: A tuple of (measurements_created_count, measurement_ids). """ - return await self._low_level_client.create_test_measurements(test_measurements) + return await self._low_level_client.create_test_measurements( + test_measurements, log_file=log_file + ) async def list_measurements( self, @@ -538,6 +574,7 @@ async def update_measurement( test_measurement: TestMeasurement, update: TestMeasurementUpdate | dict, update_step: bool = False, + log_file: str | Path | None = None, ) -> TestMeasurement: """Update a TestMeasurement. @@ -545,6 +582,7 @@ async def update_measurement( test_measurement: The TestMeasurement or measurement ID to update. update: Updates to apply to the TestMeasurement. update_step: Whether to update the step to failed if the measurement is being updated to failed. + log_file: If set, log the request to this file and return a simulated response. Returns: The updated TestMeasurement. @@ -553,10 +591,11 @@ async def update_measurement( update = TestMeasurementUpdate.model_validate(update) update.resource_id = test_measurement.id_ - updated_test_measurement = await self._low_level_client.update_test_measurement(update) + updated_test_measurement = await self._low_level_client.update_test_measurement( + update, log_file=log_file, existing=test_measurement + ) updated_test_measurement = self._apply_client_to_instance(updated_test_measurement) - # If measurement is being updated to failed, see if step is passed and update it to failed if so - if update_step and update.passed is not None and not update.passed: + if update_step and log_file is None and update.passed is not None and not update.passed: step = await self.get_step(test_step=updated_test_measurement.test_step_id) if step.status == TestStatus.PASSED: await self.update_step(test_step=step, update={"status": TestStatus.FAILED}) @@ -576,3 +615,32 @@ async def delete_measurement(self, *, test_measurement: str | TestMeasurement) - if not isinstance(measurement_id, str): raise TypeError(f"measurement_id must be a string not {type(measurement_id)}") await self._low_level_client.delete_test_measurement(measurement_id=measurement_id) + + async def replay_log_file( + self, + log_file: str | Path, + *, + incremental: bool = False, + ) -> ReplayResult: + """Replay a log file, creating real API objects from the logged simulation data. + + Two modes are available: + + * **batch** (default): Parse the entire log, reconstruct objects via + simulation, then create them all via the API in one pass. + * **incremental**: Walk the log line-by-line, issuing the real API call + for each entry. The ``LogTracking`` header is updated after every + successful call so a subsequent invocation picks up where it left off. + + Args: + log_file: Path to the log file to replay. + incremental: If True, use incremental mode. + + Returns: + A ReplayResult containing the created report, steps, and measurements. + """ + result = await self._low_level_client.replay_log_file(log_file, incremental=incremental) + result.report = self._apply_client_to_instance(result.report) + result.steps = self._apply_client_to_instances(result.steps) + result.measurements = self._apply_client_to_instances(result.measurements) + return result diff --git a/python/lib/sift_client/scripts/__init__.py b/python/lib/sift_client/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/lib/sift_client/scripts/replay_test_result_log.py b/python/lib/sift_client/scripts/replay_test_result_log.py new file mode 100644 index 000000000..be97bde53 --- /dev/null +++ b/python/lib/sift_client/scripts/replay_test_result_log.py @@ -0,0 +1,89 @@ +"""Replay a test result log file, creating real API objects from a simulation log.""" + +from __future__ import annotations + +import argparse +import logging +import os +import select +import sys +import tempfile + +from sift_client import SiftClient, SiftConnectionConfig + +logger = logging.getLogger(__name__) + + +def _print_result(result) -> None: + print(f"Report: {result.report.name} (id={result.report.id_})") + print(f"Steps: {len(result.steps)}") + for step in result.steps: + print(f" - {step.step_path} [{step.status}]") + print(f"Measurements: {len(result.measurements)}") + for m in result.measurements: + print(f" - {m.name}: passed={m.passed}") + + +def main() -> None: + """Replay a test result simulation log file against the Sift API.""" + parser = argparse.ArgumentParser( + description="Replay a test result simulation log file against the Sift API.", + ) + parser.add_argument("log_file", help="Path to the .jsonl log file to replay.") + parser.add_argument( + "--incremental", + action="store_true", + help="Replay line-by-line, tracking progress so reruns pick up where they left off.", + ) + parser.add_argument("--grpc-url", default=os.getenv("SIFT_GRPC_URI", "localhost:50051")) + parser.add_argument("--rest-url", default=os.getenv("SIFT_REST_URI", "localhost:8080")) + parser.add_argument("--api-key", default=os.getenv("SIFT_API_KEY", "")) + args = parser.parse_args() + + use_ssl = "localhost" not in args.grpc_url and "localhost" not in args.rest_url + + client = SiftClient( + connection_config=SiftConnectionConfig( + api_key=args.api_key, + grpc_url=args.grpc_url, + rest_url=args.rest_url, + use_ssl=use_ssl, + ) + ) + + try: + if args.incremental: + result = _incremental_loop(client, args.log_file) + else: + result = client.test_results.replay_log_file(args.log_file) + fp = os.path.abspath(args.log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + if result: + _print_result(result) + except Exception as e: + logger.error(e) + logger.error( + f"Error replaying log file: {args.log_file}.\n" + f" Can replay with `replay-test-result-log {args.log_file}`." + ) + raise + + +def _incremental_loop(client: SiftClient, log_file: str): + """Replay incrementally in a loop until stdin is closed (EOF).""" + result = None + while True: + received_signal, _, _ = select.select([sys.stdin], [], [], 1.0) + result = client.test_results.replay_log_file(log_file, incremental=True) + if received_signal: + break + logger.info(f"Replay completed: {result}") + fp = os.path.abspath(log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + return result + + +if __name__ == "__main__": + main() diff --git a/python/lib/sift_client/sift_types/test_report.py b/python/lib/sift_client/sift_types/test_report.py index f5b4600e2..37225d222 100644 --- a/python/lib/sift_client/sift_types/test_report.py +++ b/python/lib/sift_client/sift_types/test_report.py @@ -37,6 +37,8 @@ from sift_client.util.metadata import metadata_dict_to_proto, metadata_proto_to_dict if TYPE_CHECKING: + from pathlib import Path + from sift_client.client import SiftClient @@ -195,11 +197,17 @@ def _to_proto(self) -> TestStepProto: return proto - def update(self, update: TestStepUpdate | dict) -> TestStep: + def update( + self, + update: TestStepUpdate | dict, + log_file: str | Path | None = None, + ) -> TestStep: """Update the TestStep.""" if not self.client: raise ValueError("Client not set") - updated_test_step = self.client.test_results.update_step(test_step=self, update=update) + updated_test_step = self.client.test_results.update_step( + test_step=self, update=update, log_file=log_file + ) self._update(updated_test_step) return self @@ -405,19 +413,23 @@ def _to_proto(self) -> TestMeasurementProto: return proto def update( - self, update: TestMeasurementUpdate | dict, update_step: bool = False + self, + update: TestMeasurementUpdate | dict, + update_step: bool = False, + log_file: str | Path | None = None, ) -> TestMeasurement: """Update the TestMeasurement. Args: update: The update to apply to the TestMeasurement. update_step: Whether to update the TestStep's status to failed if the TestMeasurement is being updated to failed. + log_file: If set, log the request to this file instead of making an API call. Returns: The updated TestMeasurement. """ updated_test_measurement = self.client.test_results.update_measurement( - test_measurement=self, update=update, update_step=update_step + test_measurement=self, update=update, update_step=update_step, log_file=log_file ) self._update(updated_test_measurement) return self @@ -590,9 +602,15 @@ def _to_proto(self) -> TestReportProto: return proto - def update(self, update: TestReportUpdate | dict) -> TestReport: + def update( + self, + update: TestReportUpdate | dict, + log_file: str | Path | None = None, + ) -> TestReport: """Update the TestReport.""" - updated_test_report = self.client.test_results.update(test_report=self, update=update) + updated_test_report = self.client.test_results.update( + test_report=self, update=update, log_file=log_file + ) self._update(updated_test_report) return self diff --git a/python/lib/sift_client/transport/grpc_transport.py b/python/lib/sift_client/transport/grpc_transport.py index 95817a010..232751485 100644 --- a/python/lib/sift_client/transport/grpc_transport.py +++ b/python/lib/sift_client/transport/grpc_transport.py @@ -9,11 +9,12 @@ import asyncio import atexit import logging +import os import threading from typing import Any from urllib.parse import urlparse -from sift_client._internal.grpc.transport import ( +from sift_client._internal.grpc_transport.transport import ( SiftChannelConfig, use_sift_async_channel, ) @@ -98,6 +99,8 @@ def __init__(self, config: GrpcConfig): Args: config: The gRPC client configuration. """ + os.environ.setdefault("GRPC_ENABLE_FORK_SUPPORT", "0") + self._config = config # map each asyncio loop to its async channel and stub dict self._channels_async: dict[asyncio.AbstractEventLoop, Any] = {} diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index a01f76492..1f2361a8a 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -58,6 +58,9 @@ def main(self): - If you want each module(file) to be marked as a step w/ each test as a substep, import the `module_substep` fixture as well. - The `report_context` fixture requires a fixture `sift_client` returning an `SiftClient` instance to be passed in. + +Note: FedRAMP users: report_context will log test results to a temp file to avoid API calls during test execution. If this is a shared environment, you should import the `report_context_no_logging` fixture instead. + ###### Example at top of your test file or in your conftest.py file: ```python @@ -102,6 +105,7 @@ def test_example(report_context, step): pytest_runtest_makereport, report_context, report_context_check_connection, + report_context_no_logging, step, step_check_connection, ) @@ -115,6 +119,7 @@ def test_example(report_context, step): "pytest_runtest_makereport", "report_context", "report_context_check_connection", + "report_context_no_logging", "step", "step_check_connection", ] diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index 937f21971..807b5b64d 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -1,11 +1,15 @@ from __future__ import annotations import getpass +import logging import os import socket +import subprocess +import tempfile import traceback from contextlib import AbstractContextManager from datetime import datetime, timezone +from pathlib import Path from typing import TYPE_CHECKING import numpy as np @@ -31,11 +35,15 @@ from sift_client.client import SiftClient +logger = logging.getLogger(__name__) + class ReportContext(AbstractContextManager): """Context manager for a new TestReport. See usage example in __init__.py.""" report: TestReport + client: SiftClient + log_file: Path | None step_is_open: bool step_stack: list[TestStep] step_number_at_depth: dict[int, int] @@ -49,6 +57,7 @@ def __init__( test_system_name: str | None = None, system_operator: str | None = None, test_case: str | None = None, + log_file: str | Path | bool | None = None, ): """Initialize a new report context. @@ -58,13 +67,25 @@ def __init__( test_system_name: The name of the test system. Will default to the hostname if not provided. system_operator: The operator of the test system. Will default to the current user if not provided. test_case: The name of the test case. Will default to the basename of the file containing the test if not provided. + log_file: If True, create a temp log file. If a path, use that path. + All create/update operations will be logged to this file. """ + self.client = client self.step_is_open = False self.step_stack = [] self.step_number_at_depth = {} self.open_step_results = {} self.any_failures = False + if log_file is True: + tmp = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False) + self.log_file = Path(tmp.name) + logger.info(f"Created temporary log file: {self.log_file}") + elif log_file: + self.log_file = Path(log_file) + else: + self.log_file = None + # Create the report. test_case = test_case if test_case else os.path.basename(__file__) test_system_name = test_system_name if test_system_name else socket.gethostname() @@ -78,9 +99,30 @@ def __init__( status=TestStatus.IN_PROGRESS, system_operator=system_operator, ) - self.report = client.test_results.create(create) + self.report = client.test_results.create(create, log_file=self.log_file) + + def _open_replay_proc(self): + if self.log_file is not None: + # To avoid GRPC forking errors, temporarily redirect stderr at the fd level before forking, so the child inherits /dev/null on fd 2 when the atfork handler fires. + saved_stderr = os.dup(2) + devnull_fd = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull_fd, 2) + os.close(devnull_fd) + try: + self._replay_proc = subprocess.Popen( + ["replay-test-result-log", "--incremental", str(self.log_file)], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + finally: + os.dup2(saved_stderr, 2) + os.close(saved_stderr) + else: + self._replay_proc = None def __enter__(self): + self._open_replay_proc() return self def __exit__(self, exc_type, exc_value, traceback): @@ -91,7 +133,16 @@ def __exit__(self, exc_type, exc_value, traceback): update["status"] = TestStatus.FAILED else: update["status"] = TestStatus.PASSED - self.report.update(update) + self.report.update(update, log_file=self.log_file) + + if self._replay_proc is not None: + try: + self._replay_proc.communicate(timeout=10) + except subprocess.TimeoutExpired: + logger.warning("Replay process did not exit in 10s, killing it") + self._replay_proc.kill() + self._replay_proc.wait() + return True def new_step( @@ -126,7 +177,7 @@ def create_step(self, name: str, description: str | None = None) -> TestStep: step_path = self.get_next_step_path() parent_step = self.step_stack[-1] if self.step_stack else None - step = self.report.client.test_results.create_step( + step = self.client.test_results.create_step( TestStepCreate( test_report_id=str(self.report.id_), name=name, @@ -137,7 +188,8 @@ def create_step(self, name: str, description: str | None = None) -> TestStep: end_time=datetime.now(timezone.utc), description=description, parent_step_id=parent_step.id_ if parent_step else None, - ) + ), + log_file=self.log_file, ) # Update the step tracking structures. @@ -217,7 +269,7 @@ def __init__( assertion_as_fail_not_error: Mark steps with assertion errors as failed instead of error+traceback (some users want assertions to work as simple failures especially when using pytest). """ self.report_context = report_context - self.client = report_context.report.client + self.client = report_context.client self.current_step = self.report_context.create_step(name, description) self.assertion_as_fail_not_error = assertion_as_fail_not_error @@ -275,7 +327,8 @@ def update_step_from_result( "status": status, "end_time": datetime.now(timezone.utc), "error_info": error_info, - } + }, + log_file=self.report_context.log_file, ) return result @@ -322,7 +375,9 @@ def measure( unit=unit, ) evaluate_measurement_bounds(create, value, bounds) - measurement = self.client.test_results.create_measurement(create) + measurement = self.client.test_results.create_measurement( + create, log_file=self.report_context.log_file + ) self.report_context.record_step_outcome(measurement.passed, self.current_step) return measurement.passed diff --git a/python/lib/sift_client/util/test_results/pytest_util.py b/python/lib/sift_client/util/test_results/pytest_util.py index c2bd3f9bc..9dd91e778 100644 --- a/python/lib/sift_client/util/test_results/pytest_util.py +++ b/python/lib/sift_client/util/test_results/pytest_util.py @@ -25,12 +25,16 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo[Any]): # Skipped steps won't invoke the method/fixtures at all, so we need to manually record a step. if REPORT_CONTEXT: with REPORT_CONTEXT.new_step(name=item.name) as new_step: - new_step.current_step.update({"status": TestStatus.SKIPPED}) + new_step.current_step.update( + {"status": TestStatus.SKIPPED}, log_file=REPORT_CONTEXT.log_file + ) setattr(item, "rep_" + report.when, call) def _report_context_impl( - sift_client: SiftClient, request: pytest.FixtureRequest + sift_client: SiftClient, + request: pytest.FixtureRequest, + log_file: str | Path | bool | None = True, ) -> Generator[ReportContext | None, None, None]: test_path = Path(request.config.invocation_params.args[0]) base_name = ( @@ -43,6 +47,7 @@ def _report_context_impl( sift_client, name=f"{base_name} {datetime.now(timezone.utc).isoformat()}", test_case=str(test_case), + log_file=log_file, ) as context: # Set a global so we can access this in pytest hooks. global REPORT_CONTEXT @@ -58,6 +63,14 @@ def report_context( yield from _report_context_impl(sift_client, request) +@pytest.fixture(scope="session", autouse=True) +def report_context_no_logging( + sift_client: SiftClient, request: pytest.FixtureRequest +) -> Generator[ReportContext | None, None, None]: + """Create a report context for the session without log file.""" + yield from _report_context_impl(sift_client, request, log_file=None) + + def _step_impl( report_context: ReportContext, request: pytest.FixtureRequest ) -> Generator[NewStep | None, None, None]: diff --git a/python/pyproject.toml b/python/pyproject.toml index ed611987b..e52aa6280 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "0.13.0" +version = "0.14.0" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } @@ -49,6 +49,9 @@ Documentation = "https://docs.siftstack.com/sift_py/sift_py.html" Repository = "https://github.com/sift-stack/sift/tree/main/python" Changelog = "https://github.com/sift-stack/sift/tree/main/python/CHANGELOG.md" +[project.scripts] +replay-test-result-log = "sift_client.scripts.replay_test_result_log:main" + [project.optional-dependencies] # AUTO GENERATED EXTRAS — EDIT [tool.sift.extras] ONLY all = [