diff --git a/src/arc_client/ingestion/async_buffered.py b/src/arc_client/ingestion/async_buffered.py index e532b77..5f0c4bc 100644 --- a/src/arc_client/ingestion/async_buffered.py +++ b/src/arc_client/ingestion/async_buffered.py @@ -122,7 +122,11 @@ async def _flush_all_unlocked(self) -> None: await self._flush_measurement_unlocked(measurement) def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]: - """Merge multiple columnar batches into one.""" + """Merge multiple columnar batches into one. + + When batches have different column sets (sparse columns), missing + positions are filled with None so all columns have equal length. + """ if not batches: return {} @@ -133,12 +137,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list for batch in batches: all_columns.update(batch.keys()) + # Merge each column, padding missing columns with None merged: dict[str, list[Any]] = {} for col_name in all_columns: merged[col_name] = [] for batch in batches: if col_name in batch: merged[col_name].extend(batch[col_name]) + else: + # Column missing from this batch — pad with None + batch_len = len(batch.get("time", next(iter(batch.values())))) + merged[col_name].extend([None] * batch_len) return merged diff --git a/src/arc_client/ingestion/buffered.py b/src/arc_client/ingestion/buffered.py index 9c6876e..df3f9dc 100644 --- a/src/arc_client/ingestion/buffered.py +++ b/src/arc_client/ingestion/buffered.py @@ -167,7 +167,11 @@ def _flush_all(self) -> None: self._flush_measurement(measurement) def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]: - """Merge multiple columnar batches into one.""" + """Merge multiple columnar batches into one. + + When batches have different column sets (sparse columns), missing + positions are filled with None so all columns have equal length. + """ if not batches: return {} @@ -179,13 +183,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list for batch in batches: all_columns.update(batch.keys()) - # Merge each column + # Merge each column, padding missing columns with None merged: dict[str, list[Any]] = {} for col_name in all_columns: merged[col_name] = [] for batch in batches: if col_name in batch: merged[col_name].extend(batch[col_name]) + else: + # Column missing from this batch — pad with None + batch_len = len(batch.get("time", next(iter(batch.values())))) + merged[col_name].extend([None] * batch_len) return merged