diff --git a/Lite/Services/ArchiveService.cs b/Lite/Services/ArchiveService.cs index 9f5f0bbc..d286589b 100644 --- a/Lite/Services/ArchiveService.cs +++ b/Lite/Services/ArchiveService.cs @@ -305,18 +305,8 @@ private void CompactParquetFiles() } } - /* Compact each group that has more than one file (or any non-monthly files) */ - using var con = new DuckDBConnection("DataSource=:memory:"); - con.Open(); - - /* Cap memory to avoid multi-GB spikes decompressing large parquet archives. - DuckDB will spill excess to its temp directory automatically. */ - using (var pragma = con.CreateCommand()) - { - pragma.CommandText = "SET memory_limit = '2GB'; SET preserve_insertion_order = false;"; - pragma.ExecuteNonQuery(); - } - + /* Compact each group that has more than one file (or any non-monthly files). + Each group gets its own DuckDB connection so memory is fully released between groups. */ var totalMerged = 0; var totalRemoved = 0; @@ -346,16 +336,16 @@ DuckDB will spill excess to its temp directory automatically. */ var sourcePaths = files .Select(f => Path.Combine(_archivePath, f).Replace("\\", "/")) .ToList(); - var pathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'")); - /* Build SELECT with column exclusions for specific tables. - Only exclude columns that actually exist in the source files - (they may have been stripped in a previous compaction). */ + /* Determine column exclusions up front using all source files */ var selectClause = "*"; if (CompactionExcludeColumns.TryGetValue(table, out var excludeCols)) { - using var schemaCmd = con.CreateCommand(); - schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{pathList}], union_by_name=true))"; + using var schemaCon = new DuckDBConnection("DataSource=:memory:"); + schemaCon.Open(); + var allPathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'")); + using var schemaCmd = schemaCon.CreateCommand(); + schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{allPathList}], union_by_name=true))"; using var reader = schemaCmd.ExecuteReader(); var existingCols = new HashSet(StringComparer.OrdinalIgnoreCase); while (reader.Read()) existingCols.Add(reader.GetString(0)); @@ -367,10 +357,65 @@ Only exclude columns that actually exist in the source files } } - using var cmd = con.CreateCommand(); - cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " + - $"TO '{tempPath}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)"; - cmd.ExecuteNonQuery(); + if (sourcePaths.Count <= 2) + { + /* Small group — single-pass merge */ + using var con = new DuckDBConnection("DataSource=:memory:"); + con.Open(); + using (var pragma = con.CreateCommand()) + { + pragma.CommandText = "SET memory_limit = '4GB'; SET preserve_insertion_order = false;"; + pragma.ExecuteNonQuery(); + } + + var pathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'")); + using var cmd = con.CreateCommand(); + cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " + + $"TO '{tempPath}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)"; + cmd.ExecuteNonQuery(); + } + else + { + /* Large group — incremental merge (pairs) to keep peak memory low. + Sort smallest-first so early merges are cheap. */ + var sorted = sourcePaths + .OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length) + .ToList(); + + var currentPath = sorted[0]; + var intermediateFiles = new List(); + + for (var i = 1; i < sorted.Count; i++) + { + var stepOutput = i < sorted.Count - 1 + ? targetPath + $".step{i}.tmp" + : tempPath; + + using var con = new DuckDBConnection("DataSource=:memory:"); + con.Open(); + using (var pragma = con.CreateCommand()) + { + pragma.CommandText = "SET memory_limit = '4GB'; SET preserve_insertion_order = false;"; + pragma.ExecuteNonQuery(); + } + + var pairList = $"'{currentPath}', '{sorted[i]}'"; + using var cmd = con.CreateCommand(); + cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " + + $"TO '{stepOutput}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)"; + cmd.ExecuteNonQuery(); + + /* Clean up previous intermediate file */ + if (intermediateFiles.Count > 0) + { + var prev = intermediateFiles[^1]; + try { File.Delete(prev); } catch { /* best effort */ } + } + + intermediateFiles.Add(stepOutput); + currentPath = stepOutput; + } + } /* Remove originals */ var removed = 0; @@ -404,11 +449,15 @@ Only exclude columns that actually exist in the source files { _logger?.LogError(ex, "Failed to compact {Month}/{Table} ({Count} files)", month, table, files.Count); - /* Clean up temp file on failure */ + /* Clean up temp and intermediate files on failure */ if (File.Exists(tempPath)) { try { File.Delete(tempPath); } catch { /* best effort */ } } + foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}.parquet.step*.tmp")) + { + try { File.Delete(stepFile); } catch { /* best effort */ } + } } }