From de1c235169e6746a3da09d68da6349c59d0d7a0e Mon Sep 17 00:00:00 2001 From: Kaelyn Somerville Date: Wed, 20 Aug 2025 11:21:13 +0100 Subject: [PATCH 1/2] Add parallel compression for multithreaded process --- code/common/compress.q | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/code/common/compress.q b/code/common/compress.q index 13560f343..da1d0b107 100644 --- a/code/common/compress.q +++ b/code/common/compress.q @@ -129,7 +129,15 @@ showcomp:{[hdbpath;csvpath;maxage] compressfromtable:{[table] statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); - {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;} + / Check if process is single threaded - if multi then compress in parallel then clean up after + $[0= system"s"; + [.lg.o[`compression; "Single threaded process, compress applied sequentially"]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table]; + [.lg.o[`compression; "Multithreaded process, compress applied in parallel "]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; + {cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;]]} + + /- call the compression with a max age paramter implemented compressmaxage:{[hdbpath;csvpath;maxage] @@ -162,21 +170,30 @@ compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] / perform the compression/decompression if[0=algo;comprL:(-21!filetoCompress)`compressedLength]; -19!(filetoCompress;compressedFile;blocksize;algo;level); - / check the compressed/decomp file and move if appropriate; else delete compressed file and log error - $[((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; - [.lg.o[`compression;"File ",cmp,"compressed ","successfully; matches orginal. Deleting original."]; - system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; - / move the hash files too. - hashfilecheck[compressedFile;filetoCompress;sf]; - /-log to the table if the algo wasn't 0 - statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]]; - [$[not count -21!compressedFile; - [.lg.o[`compression; "Failed to compress file ",string[filetoCompress]];hdel compressedFile]; - [.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];hdel compressedFile]]]] ]; / if already compressed/decompressed, then log that and skip. .lg.o[`compression; "file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]]} +cleancompressed:{[filetoCompress;algo;blocksize;level;sizeuncomp] + / Run after "compress" to clean up the compression files, moves files and deletes where applicable + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + cmp:$[algo=0;"de";""]; + if[0=algo;comprL:(-21!filetoCompress)`compressedLength]; + / Check if the compressed file exists, if key returns empty list then file doesnt exist + $[()~ key compressedFile; + .lg.o[`compression; "No compressed file present for the following file - ",string[filetoCompress]]; + $[((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; + [.lg.o[`compression;"File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; + system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; + / move the hash files too. + hashfilecheck[compressedFile;filetoCompress;sf]; + /-log to the table if the algo wasn't 0 + statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]]; + [$[not count -21!compressedFile; + [.lg.o[`compression; "Failed to compress file ",string[filetoCompress]];hdel compressedFile]; + [.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];hdel compressedFile]]]]]} + + hashfilecheck:{[compressedFile;filetoCompress;sf] / if running 3.6 or higher, account for anymap type for nested lists / check for double hash file if nested data contains symbol vector/atom From 1b8802421230d105b6697367a7cc8ccccd4aec80 Mon Sep 17 00:00:00 2001 From: Kaelyn Somerville Date: Thu, 21 Aug 2025 16:38:27 +0100 Subject: [PATCH 2/2] Ammendments to the parallel_compression logic to clean the code and make it more modular depending on threads count --- code/common/compress.q | 62 ++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/code/common/compress.q b/code/common/compress.q index da1d0b107..f7e2dc2be 100644 --- a/code/common/compress.q +++ b/code/common/compress.q @@ -130,14 +130,29 @@ showcomp:{[hdbpath;csvpath;maxage] compressfromtable:{[table] statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); / Check if process is single threaded - if multi then compress in parallel then clean up after + // Add metrics on any files due to be compressed to be used afterwards for comparison + table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; $[0= system"s"; - [.lg.o[`compression; "Single threaded process, compress applied sequentially"]; - {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table]; - [.lg.o[`compression; "Multithreaded process, compress applied in parallel "]; - {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; - {cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;]]} - - + singlethreadcompress[table]; + multithreadcompress[table]]; + / Update the stats tab table after the compression + {statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table} + +statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre] + if[not compressionvaluepre ~ (-21!file)`compressedLength; + statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]} + +singlethreadcompress:{[table] + .lg.o[`compression; "Single threaded process, compress applied sequentially"]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; + cleancompressed[x `fullpath;x `calgo]} each table; + } + +multithreadcompress:{[table] + .lg.o[`compression; "Multithreaded process, compress applied in parallel "]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; + {cleancompressed[x `fullpath;x `calgo]} each table; + } /- call the compression with a max age paramter implemented compressmaxage:{[hdbpath;csvpath;maxage] @@ -168,30 +183,31 @@ compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] $[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo); [.lg.o[`compression;cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."]; / perform the compression/decompression - if[0=algo;comprL:(-21!filetoCompress)`compressedLength]; -19!(filetoCompress;compressedFile;blocksize;algo;level); ]; / if already compressed/decompressed, then log that and skip. .lg.o[`compression; "file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]]} -cleancompressed:{[filetoCompress;algo;blocksize;level;sizeuncomp] - / Run after "compress" to clean up the compression files, moves files and deletes where applicable +cleancompressed:{[filetoCompress;algo] compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; cmp:$[algo=0;"de";""]; - if[0=algo;comprL:(-21!filetoCompress)`compressedLength]; - / Check if the compressed file exists, if key returns empty list then file doesnt exist - $[()~ key compressedFile; + // Verify compressed file exists + if[()~ key compressedFile; .lg.o[`compression; "No compressed file present for the following file - ",string[filetoCompress]]; - $[((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; - [.lg.o[`compression;"File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; - system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; - / move the hash files too. - hashfilecheck[compressedFile;filetoCompress;sf]; - /-log to the table if the algo wasn't 0 - statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]]; - [$[not count -21!compressedFile; - [.lg.o[`compression; "Failed to compress file ",string[filetoCompress]];hdel compressedFile]; - [.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];hdel compressedFile]]]]]} + :(); + ]; + // Verify compressed file's contents match original + if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; + .lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"]; + hdel compressedFile; + :(); + ]; + // Given above two checks satisfied run the delete of old and rename compressed to original name + .lg.o[`compression;"File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; + system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; + / move the hash files too. + hashfilecheck[compressedFile;filetoCompress;sf]; + } hashfilecheck:{[compressedFile;filetoCompress;sf]