Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions Lite/Services/ArchiveService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,8 @@ private void CompactParquetFiles()
}
}

/* Compact each group that has more than one file (or any non-monthly files) */
using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();

/* Cap memory to avoid multi-GB spikes decompressing large parquet archives.
DuckDB will spill excess to its temp directory automatically. */
using (var pragma = con.CreateCommand())
{
pragma.CommandText = "SET memory_limit = '2GB'; SET preserve_insertion_order = false;";
pragma.ExecuteNonQuery();
}

/* Compact each group that has more than one file (or any non-monthly files).
Each group gets its own DuckDB connection so memory is fully released between groups. */
var totalMerged = 0;
var totalRemoved = 0;

Expand Down Expand Up @@ -346,16 +336,16 @@ DuckDB will spill excess to its temp directory automatically. */
var sourcePaths = files
.Select(f => Path.Combine(_archivePath, f).Replace("\\", "/"))
.ToList();
var pathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'"));

/* Build SELECT with column exclusions for specific tables.
Only exclude columns that actually exist in the source files
(they may have been stripped in a previous compaction). */
/* Determine column exclusions up front using all source files */
var selectClause = "*";
if (CompactionExcludeColumns.TryGetValue(table, out var excludeCols))
{
using var schemaCmd = con.CreateCommand();
schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{pathList}], union_by_name=true))";
using var schemaCon = new DuckDBConnection("DataSource=:memory:");
schemaCon.Open();
var allPathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'"));
using var schemaCmd = schemaCon.CreateCommand();
schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{allPathList}], union_by_name=true))";
using var reader = schemaCmd.ExecuteReader();
var existingCols = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
while (reader.Read()) existingCols.Add(reader.GetString(0));
Expand All @@ -367,10 +357,65 @@ Only exclude columns that actually exist in the source files
}
}

using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " +
$"TO '{tempPath}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)";
cmd.ExecuteNonQuery();
if (sourcePaths.Count <= 2)
{
/* Small group — single-pass merge */
using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = "SET memory_limit = '4GB'; SET preserve_insertion_order = false;";
pragma.ExecuteNonQuery();
}

var pathList = string.Join(", ", sourcePaths.Select(p => $"'{p}'"));
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " +
$"TO '{tempPath}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)";
cmd.ExecuteNonQuery();
}
else
{
/* Large group — incremental merge (pairs) to keep peak memory low.
Sort smallest-first so early merges are cheap. */
var sorted = sourcePaths
.OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length)
.ToList();

var currentPath = sorted[0];
var intermediateFiles = new List<string>();

for (var i = 1; i < sorted.Count; i++)
{
var stepOutput = i < sorted.Count - 1
? targetPath + $".step{i}.tmp"
: tempPath;

using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = "SET memory_limit = '4GB'; SET preserve_insertion_order = false;";
pragma.ExecuteNonQuery();
}

var pairList = $"'{currentPath}', '{sorted[i]}'";
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " +
$"TO '{stepOutput}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)";
cmd.ExecuteNonQuery();

/* Clean up previous intermediate file */
if (intermediateFiles.Count > 0)
{
var prev = intermediateFiles[^1];
try { File.Delete(prev); } catch { /* best effort */ }
}

intermediateFiles.Add(stepOutput);
currentPath = stepOutput;
}
}

/* Remove originals */
var removed = 0;
Expand Down Expand Up @@ -404,11 +449,15 @@ Only exclude columns that actually exist in the source files
{
_logger?.LogError(ex, "Failed to compact {Month}/{Table} ({Count} files)", month, table, files.Count);

/* Clean up temp file on failure */
/* Clean up temp and intermediate files on failure */
if (File.Exists(tempPath))
{
try { File.Delete(tempPath); } catch { /* best effort */ }
}
foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}.parquet.step*.tmp"))
{
try { File.Delete(stepFile); } catch { /* best effort */ }
}
}
}

Expand Down
Loading