From 255562c21fdb5aa7d740e404213a7abf18e8f3f3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 1 Jul 2025 05:22:13 +0200 Subject: [PATCH] DMS: Update to commons-codec 0.0.24 --- CHANGES.md | 1 + lorrystream/process/kinesis_cratedb_lambda.py | 6 ++++-- pyproject.toml | 2 +- tests/test_process.py | 6 ++++-- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e07e660..937ac6c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## in progress +- DMS: Updated to commons-codec 0.0.24 ## 2025-06-23 v0.0.8 - DMS: Improved stack configuration and replication settings diff --git a/lorrystream/process/kinesis_cratedb_lambda.py b/lorrystream/process/kinesis_cratedb_lambda.py index c79effc..154752d 100644 --- a/lorrystream/process/kinesis_cratedb_lambda.py +++ b/lorrystream/process/kinesis_cratedb_lambda.py @@ -36,8 +36,8 @@ import sqlalchemy as sa from commons_codec.exception import UnknownOperationError -from commons_codec.model import ColumnTypeMapStore -from commons_codec.transform.aws_dms import DMSTranslatorCrateDB +from commons_codec.model import ColumnMappingStrategy, ColumnTypeMapStore +from commons_codec.transform.aws_dms import DMSTranslatorCrateDB, DMSTranslatorCrateDBRecordFactory from commons_codec.transform.dynamodb import DynamoDBCDCTranslator from sqlalchemy.util import asbool @@ -76,7 +76,9 @@ # TODO: Automatically create destination table. # TODO: Propagate mapping definitions and other settings. +# TODO: Propagate column mapping strategy. if MESSAGE_FORMAT == "dms": + DMSTranslatorCrateDBRecordFactory.DEFAULT_MAPPING_STRATEGY = ColumnMappingStrategy.UNIVERSAL cdc = DMSTranslatorCrateDB(column_types=column_types) elif MESSAGE_FORMAT == "dynamodb": cdc = DynamoDBCDCTranslator(table_name=SINK_TABLE) diff --git a/pyproject.toml b/pyproject.toml index fec4f5e..9d6d415 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,7 +88,7 @@ dependencies = [ "click<9", "colorama<1", "colorlog", - "commons-codec==0.0.22", + "commons-codec>=0.0.24", "dask", "funcy", "influxdb", diff --git a/tests/test_process.py b/tests/test_process.py index 76e0a7d..7c2a558 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -120,7 +120,7 @@ def test_kinesis_dms_cratedb_lambda_basic(mocker, cratedb, reset_handler): column_types = ColumnTypeMapStore().add( table=TableAddress(schema="public", table="foo"), column="attributes", - type_=ColumnType.MAP, + type_=ColumnType.OBJECT, ) # Configure environment variables. @@ -142,5 +142,7 @@ def test_kinesis_dms_cratedb_lambda_basic(mocker, cratedb, reset_handler): records = cratedb.database.run_sql('SELECT * FROM "public"."foo";', records=True) assert records[0] == { - "data": {"id": 46, "name": "Jane", "age": 31, "attributes": {"baz": "qux"}}, + "pk": {"id": 46}, + "data": {"name": "Jane", "age": 31, "attributes": {"baz": "qux"}}, + "aux": {}, }