From d36ac676a762de2e85a7d8f4ad0027682de6b4a1 Mon Sep 17 00:00:00 2001 From: Ignacio Van Droogenbroeck Date: Sat, 14 Feb 2026 21:33:00 -0300 Subject: [PATCH] fix(ingestion): pad missing columns with None in buffered merge (#202) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When merging columnar batches with different field sets, missing columns were skipped entirely — creating misaligned arrays where some columns had fewer elements than the time column. This caused data corruption when the server received uneven column lengths. Fix: pad missing columns with None values so all columns have equal length. MessagePack serializes None as nil, which the Arc server now handles correctly with validity bitmaps. Related: Basekick-Labs/arc#202 --- src/arc_client/ingestion/async_buffered.py | 11 ++++++++++- src/arc_client/ingestion/buffered.py | 12 ++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/arc_client/ingestion/async_buffered.py b/src/arc_client/ingestion/async_buffered.py index e532b77..5f0c4bc 100644 --- a/src/arc_client/ingestion/async_buffered.py +++ b/src/arc_client/ingestion/async_buffered.py @@ -122,7 +122,11 @@ async def _flush_all_unlocked(self) -> None: await self._flush_measurement_unlocked(measurement) def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]: - """Merge multiple columnar batches into one.""" + """Merge multiple columnar batches into one. + + When batches have different column sets (sparse columns), missing + positions are filled with None so all columns have equal length. + """ if not batches: return {} @@ -133,12 +137,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list for batch in batches: all_columns.update(batch.keys()) + # Merge each column, padding missing columns with None merged: dict[str, list[Any]] = {} for col_name in all_columns: merged[col_name] = [] for batch in batches: if col_name in batch: merged[col_name].extend(batch[col_name]) + else: + # Column missing from this batch — pad with None + batch_len = len(batch.get("time", next(iter(batch.values())))) + merged[col_name].extend([None] * batch_len) return merged diff --git a/src/arc_client/ingestion/buffered.py b/src/arc_client/ingestion/buffered.py index 9c6876e..df3f9dc 100644 --- a/src/arc_client/ingestion/buffered.py +++ b/src/arc_client/ingestion/buffered.py @@ -167,7 +167,11 @@ def _flush_all(self) -> None: self._flush_measurement(measurement) def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]: - """Merge multiple columnar batches into one.""" + """Merge multiple columnar batches into one. + + When batches have different column sets (sparse columns), missing + positions are filled with None so all columns have equal length. + """ if not batches: return {} @@ -179,13 +183,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list for batch in batches: all_columns.update(batch.keys()) - # Merge each column + # Merge each column, padding missing columns with None merged: dict[str, list[Any]] = {} for col_name in all_columns: merged[col_name] = [] for batch in batches: if col_name in batch: merged[col_name].extend(batch[col_name]) + else: + # Column missing from this batch — pad with None + batch_len = len(batch.get("time", next(iter(batch.values())))) + merged[col_name].extend([None] * batch_len) return merged