From 4202b5f01a7556db8486e88715b26a8630613428 Mon Sep 17 00:00:00 2001 From: Boyce-pj Date: Mon, 5 Jan 2026 21:16:34 +0000 Subject: [PATCH 1/3] Adding compression functions to new compression module --- di/compression/compression.md | 0 di/compression/compression.q | 220 ++++++++++++++++++++++++++++++++++ di/compression/init.q | 25 ++++ di/compression/test.csv | 0 4 files changed, 245 insertions(+) create mode 100644 di/compression/compression.md create mode 100644 di/compression/compression.q create mode 100644 di/compression/init.q create mode 100644 di/compression/test.csv diff --git a/di/compression/compression.md b/di/compression/compression.md new file mode 100644 index 0000000..e69de29 diff --git a/di/compression/compression.q b/di/compression/compression.q new file mode 100644 index 0000000..1d0db16 --- /dev/null +++ b/di/compression/compression.q @@ -0,0 +1,220 @@ +/ +Data Intellect (info@dataintellect.com) + +USAGE OF COMPRESSION: + +NOTE: Please use with caution. +To SHOW a table of files to be compressed and how before execution, use: + +-with a specified csv driver file: +.cmp.showcomp[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] + +OR +-with compressionconfig.csv file located in the config folder (TORQ/src/config/compressionconfig.csv): +.cmp.showcomp[`:/path/to/hdb;.cmp.inputcsv; maxagefilestocompress] + +To then COMPRESS all files: + +.cmp.compressmaxage[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] +OR +.cmp.compressmaxage[`:/path/to/hdb;.cmp.inputcsv; maxagefilestocompress] + +If you don't care about the maximum age of the files and just want to COMPRESS up to the oldest files in the db then use: + +.cmp.docompression[`:/path/to/hdb;`:/path/to/csv] +OR +.cmp.docompression[`:/path/to/hdb;.cmp.inputcsv] + +csv should have the following format: + +table,minage,column,calgo,cblocksize,clevel +default,10,default, 2, 17,6 +quotes, 10,time, 2, 17, 5 +quotes,10,src,2,17,4 +depth, 10,default, 1, 17, 8 + +-tables in the db but not in the config tab are automatically compressed using default params +-tabs with cols specified will have other columns compressed with default (if default specified for cols of tab, all cols are comp in that tab) +-algo 0 decompresses the file, or if not compressed ignores +-config file could just be one row to compress everything older than age with the same params: + +table,minage,column,calgo,cblocksize,clevel +default,10,default,2,17,6 + +The gzip algo (2) is not necessarily included on windows and unix systems. +See: code.kx.com/wiki/Cookbook/FileCompression for more details + +For WINDOWS users: + +The minimum block size for compression on windows is 16. + +\ + +/inputcsv:@[value;`inputcsv;first .proc.getconfigfile["compressionconfig.csv"]]; + +/if[-11h=type inputcsv;inputcsv:string inputcsv]; + +checkcsv:{[csvtab;path] + // include snappy (3) for version 3.4 or after + allowedalgos:0 1 2,$[.z.K>=3.4;3;()]; + if[0b~all colscheck:`table`minage`column`calgo`cblocksize`clevel in (cols csvtab); + .log.error[err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err]; + if[count checkalgo:exec i from csvtab where not calgo in allowedalgos; + .log.error[err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err]; + if[count checkblock:exec i from csvtab where calgo in 1 2, not cblocksize in 12 + til 9; + .log.error[err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err]; + if[count checklevel: exec i from csvtab where calgo in 2, not clevel in til 10; + .log.error[err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err]; + if[.z.o like "w*"; if[count rowwin:where ((csvtab[`cblocksize] < 16) & csvtab[`calgo] > 0); + .log.error[err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]]; + if[(any nulls: any null (csvtab[`column];csvtab[`table];csvtab[`minage];csvtab[`clevel]))>0; + .log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err]; + } + +loadcsv:{[inputcsv] + .cmp.compressioncsv::@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; + .[checkcsv;(.cmp.compressioncsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];delete compressioncsv from `.cmp}]; + } + +traverse:{$[(0=count k)or x~k:key x; x; .z.s each ` sv' x,/:k where not any k like/:(".d";"*.q";"*.k";"*#")]} + +hdbstructure:{ + t:([]fullpath:(raze/)traverse x); // orig traverse + // calculate the length of the input path + base:count "/" vs string x; + // split out the full path + t:update splitcount:count each split from update split:"/" vs' string fullpath,column:`,table:`,partition:(count t)#enlist"" from t; + // partitioned tables + t:update partition:split[;base],table:`$split[;base+1],column:`$split[;base+2] from t where splitcount=base+3; + // splayed + t:update table:`$split[;base],column:`$split[;base+1] from t where splitcount=base+2; + // cast the partition type + t:update partition:{$[not all null r:"D"$'x;r;not all null r:"M"$'x;r;"I"$'x]}[partition] from t; + /- work out the age of each partition + $[14h=type t`partition; t:update age:.z.D - partition from t; + 13h=type t`partition; t:update age:(`month$.z.D) - partition from t; + // otherwise it is ints. If all the values are within 1000 and 3000 + // then assume it is years + t:update age:{$[all x within 1000 3000; x - `year$.z.D;(count x)#0Ni]}[partition] from t]; + delete splitcount,split from t} + +showcomp:{[hdbpath;csvpath;maxage] + /-load csv + loadcsv[$[10h = type csvpath;hsym `$csvpath;hsym csvpath]]; + .log.info["compression: scanning hdb directory structure"]; + /-build paths table and fill age + $[count key (` sv hdbpath,`$"par.txt"); + pathstab:update 0W^age from (,/) hdbstructure'[hsym each `$(read0 ` sv hdbpath,`$"par.txt")]; + pathstab:update 0W^age from hdbstructure[hsym hdbpath]]; + /-delete anything which isn't a table + pathstab:delete from pathstab where table in `; + /-tables that are in the hdb but not specified in the csv - compress with `default params + comptab:2!delete minage from update compressage:minage from .cmp.compressioncsv; + /-specified columns and tables + a:select from comptab where not table=`default, not column=`default; + /-default columns, specified tables + b:select from comptab where not table=`default,column=`default; + /-defaults + c:select from comptab where table = `default, column =`default; + /-join on defaults to entire table + t: pathstab,'(count pathstab)#value c; + /- join on for specified tables + t: t lj 1!delete column from b; + /- join on table and column specified information + t: t lj a; + /- in case of no default specified, delete from the table where no data is joined on + t: delete from t where calgo=0Nj,cblocksize=0Nj,clevel=0Nj; + .log.info["compression: getting current size of each file up to a maximum age of ",string maxage]; + update currentsize:hcount each fullpath from select from t where age within (compressage;maxage) } + +compressfromtable:{[table] + .cmp.statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); + / Check if process is single threaded - if multi then compress in parallel then clean up after + // Add metrics on any files due to be compressed to be used afterwards for comparison + table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; + $[0= system"s"; + singlethreadcompress[table]; + multithreadcompress[table]]; + / Update the stats tab table after the compression + {statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table} + +statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre] + if[not compressionvaluepre ~ (-21!file)`compressedLength; + .cmp.statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]} + +singlethreadcompress:{[table] + .log.info["compression: Single threaded process, compress applied sequentially"]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; + cleancompressed[x `fullpath;x `calgo]} each table; + } + +multithreadcompress:{[table] + .log.info["compression: Multithreaded process, compress applied in parallel "]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; + {cleancompressed[x `fullpath;x `calgo]} each table; + } + +/- call the compression with a max age paramter implemented +compressmaxage:{[hdbpath;csvpath;maxage] + compressfromtable[showcomp[hdbpath;csvpath;maxage]]; + summarystats[]; + } + +docompression:compressmaxage[;;0W]; + +summarystats:{ + /- table with compressionratio for each file + .cmp.statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from .cmp.statstab); + compressedfiles: select from .cmp.statstab where not algo = 0; + uncompressedfiles:select from .cmp.statstab where algo = 0; + /- summarytable + memorysavings: ((sum compressedfiles`uncompressedLength) - sum compressedfiles`compressedLength) % 2 xexp 20; + totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength; + memoryusage:((sum uncompressedfiles`uncompressedLength) - sum uncompressedfiles`compressedLength) % 2 xexp 20; + totaldecompratio: neg (sum uncompressedfiles`compressedLength) % sum uncompressedfiles`uncompressedLength; + .log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."]; + .log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."]; + .log.info["compression: Check .cmp.statstab for info on each file."];} + +compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + / compress or decompress as appropriate: + cmp:$[algo=0;"de";""]; + $[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo); + [.log.info["compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."]; + / perform the compression/decompression + -19!(filetoCompress;compressedFile;blocksize;algo;level); + ]; + / if already compressed/decompressed, then log that and skip. + .log.info["compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]]} + +cleancompressed:{[filetoCompress;algo] + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + cmp:$[algo=0;"de";""]; + // Verify compressed file exists + if[()~ key compressedFile; + .log.info["compression: No compressed file present for the following file - ",string[filetoCompress]]; + :(); + ]; + // Verify compressed file's contents match original + if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; + .log.info["compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"]; + hdel compressedFile; + :(); + ]; + // Given above two checks satisfied run the delete of old and rename compressed to original name + .log.info["compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; + system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; + / move the hash files too. + hashfilecheck[compressedFile;filetoCompress;sf]; + } + + +hashfilecheck:{[compressedFile;filetoCompress;sf] + / if running 3.6 or higher, account for anymap type for nested lists + / check for double hash file if nested data contains symbol vector/atom + $[3.6<=.z.K; + if[77 = type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"; + .[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);.log.info["compression: File does not have enumeration domain"]]]; + / if running below 3.6, nested list types will be 77h+t and will not have double hash file + if[78 <= type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"]]} diff --git a/di/compression/init.q b/di/compression/init.q new file mode 100644 index 0000000..aaf91b2 --- /dev/null +++ b/di/compression/init.q @@ -0,0 +1,25 @@ +/ Load core functionality into root module namespace +\l ::compression.q + +/ Load KX log module +.logger:use`kx.log +.log:.logger.createLog[] +.log.addfmt[`custom;"$l: $p PID[$i] $m\n"]; / Simplified logging - DELETE +.log.setfmt[`custom] + +export:([ + checkcsv:checkcsv; + loadcsv:loadcsv; + hdbstructure:hdbstructure; + showcomp:showcomp; + compressfromtable:compressfromtable; + statstabupdate:statstabupdate; + singlethreadcompress:singlethreadcompress; + multithreadcompress:multithreadcompress; + compressmaxage:compressmaxage; + docompression:docompression; + summarystats:summarystats; + compress:compress; + cleancompressed:cleancompressed; + hashfilecheck:hashfilecheck + ]) diff --git a/di/compression/test.csv b/di/compression/test.csv new file mode 100644 index 0000000..e69de29 From c849ae977b4d590665ab5ff095ac96ed971bcf03 Mon Sep 17 00:00:00 2001 From: Boyce-pj Date: Tue, 6 Jan 2026 21:36:07 +0000 Subject: [PATCH 2/3] Improving global variable assignment and reducing exposed functions to necessary only + documentation --- di/compression/compression.md | 89 +++++++++++++++++++++++++++++++++++ di/compression/compression.q | 30 +++++++----- di/compression/init.q | 17 ++----- 3 files changed, 110 insertions(+), 26 deletions(-) diff --git a/di/compression/compression.md b/di/compression/compression.md index e69de29..d9108b9 100644 --- a/di/compression/compression.md +++ b/di/compression/compression.md @@ -0,0 +1,89 @@ +# Compression + +`compression.q` applies compression to any kdb+ database, handles all partition types including date, month, year, int, and can deal with top level splayed tables. It will also decompress files as required. + +> **Note:** Please use with caution + +--- + +## :sparkles: Features + +- Apply compression to any kdb+ database +- Compression uses -19! operator +- Configured using specified csv driver file +- Compression algorithm (none, kdb+ IPC, gzip), compression blocksize, and compression level (for gzip) can be configured +- Flexibility to compress different tables/columns with different compression parameters +- Preview available before compression to display files to be compressed and how +- Summary statistics for each file returned after compression/decompression complete + +--- + +## memo: Dependencies + +- KX log module + +--- + +## :gear: compressioncsv Schema + +Blank compressioncsv is created when module is loaded. +loadcsv or showcomp functions accept `:/path/to/csv argument to specify driver csv location and will load the config to compressioncsv + +| Column | Type | Description | +|------------|---------|--------------------------------------------------------| +| table | `symbol`| table names for specific compression | +| minage | `int` | minimum age of file in days before compression applied | +| column | `symbol`| column name for column specific compression | +| calgo | `int` | compression algorithim - 0, 1, or 2 accepted | +| cblocksize | `int` | compression blocksize - between 12 and 19 | +| clevel | `int` | compression level - 0 to 9 - for gzip (2) only | + +## :label: Example Config + +table,minage,column,calgo,cblocksize,clevel +default,10,default, 2, 17,6 +quotes, 10,time, 2, 17, 5 +quotes,10,src,2,17,4 +depth, 10,default, 1, 17, 8 + +- tables in the db but not in the config tab are automatically compressed using default params +- tabs with cols specified will have other columns compressed with default (if default specified for cols of tab, all cols are comp in that tab) +- algo 0 decompresses the file, or if not compressed ignores +- config file could just be one row to compress everything older than age with the same params: + +--- + +## :wrench: Functions + +| Function | Description | +|--------------------|----------------------------------------------------------------| +|`showcomp` | Load specified compression config and show compression details for files to be compressed | +|`getcompressioncsv` | get function to return loaded compressioncsv config | +|`compressmaxage` | Compress files according to config up to the specified max age | +|`docompression` | Compress all files in hdb according to compressioncsv config | +|`getstatstab` | get function to retrieve summary stats post compression | + +--- + +## :test_tube: Example + +```q +// Include compression module in a process +.cmp: use `di.compression + +// View dictionary of functions +.cmp + +// Show table of files to be compressed and how before execution +.cmp.showcomp[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] + +// COMPRESS all files up to a max age: +.cmp.compressmaxage[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] + +// COMPRESS up to the oldest files in the db: +.cmp.docompression[`:/path/to/hdb;`:/path/to/csv] + +// Retrieve summary statistics for compression +.cmp.getstatstab[] + +``` diff --git a/di/compression/compression.q b/di/compression/compression.q index 1d0db16..5e20bde 100644 --- a/di/compression/compression.q +++ b/di/compression/compression.q @@ -50,10 +50,6 @@ The minimum block size for compression on windows is 16. \ -/inputcsv:@[value;`inputcsv;first .proc.getconfigfile["compressionconfig.csv"]]; - -/if[-11h=type inputcsv;inputcsv:string inputcsv]; - checkcsv:{[csvtab;path] // include snappy (3) for version 3.4 or after allowedalgos:0 1 2,$[.z.K>=3.4;3;()]; @@ -71,11 +67,17 @@ checkcsv:{[csvtab;path] .log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err]; } +compressioncsv:([] table:`$();minage:`int$();column:`$();calgo:`int$();cblocksize:`int$();clevel:`int$()) + loadcsv:{[inputcsv] - .cmp.compressioncsv::@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; - .[checkcsv;(.cmp.compressioncsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];delete compressioncsv from `.cmp}]; + loadedcsv:@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; + res:.[checkcsv;(loadedcsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];:0b}]; + if[res~0b;:(::)]; + compressioncsv::loadedcsv; } +getcompressioncsv:{[] .z.m.compressioncsv} + traverse:{$[(0=count k)or x~k:key x; x; .z.s each ` sv' x,/:k where not any k like/:(".d";"*.q";"*.k";"*#")]} hdbstructure:{ @@ -109,7 +111,7 @@ showcomp:{[hdbpath;csvpath;maxage] /-delete anything which isn't a table pathstab:delete from pathstab where table in `; /-tables that are in the hdb but not specified in the csv - compress with `default params - comptab:2!delete minage from update compressage:minage from .cmp.compressioncsv; + comptab:2!delete minage from update compressage:minage from .z.m.compressioncsv; /-specified columns and tables a:select from comptab where not table=`default, not column=`default; /-default columns, specified tables @@ -128,7 +130,7 @@ showcomp:{[hdbpath;csvpath;maxage] update currentsize:hcount each fullpath from select from t where age within (compressage;maxage) } compressfromtable:{[table] - .cmp.statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); + statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); / Check if process is single threaded - if multi then compress in parallel then clean up after // Add metrics on any files due to be compressed to be used afterwards for comparison table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; @@ -140,7 +142,7 @@ compressfromtable:{[table] statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre] if[not compressionvaluepre ~ (-21!file)`compressedLength; - .cmp.statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]} + statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]} singlethreadcompress:{[table] .log.info["compression: Single threaded process, compress applied sequentially"]; @@ -162,11 +164,13 @@ compressmaxage:{[hdbpath;csvpath;maxage] docompression:compressmaxage[;;0W]; +getstatstab:{[] .z.m.statstab} + summarystats:{ /- table with compressionratio for each file - .cmp.statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from .cmp.statstab); - compressedfiles: select from .cmp.statstab where not algo = 0; - uncompressedfiles:select from .cmp.statstab where algo = 0; + statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from statstab); + compressedfiles: select from statstab where not algo = 0; + uncompressedfiles:select from statstab where algo = 0; /- summarytable memorysavings: ((sum compressedfiles`uncompressedLength) - sum compressedfiles`compressedLength) % 2 xexp 20; totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength; @@ -174,7 +178,7 @@ summarystats:{ totaldecompratio: neg (sum uncompressedfiles`compressedLength) % sum uncompressedfiles`uncompressedLength; .log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."]; .log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."]; - .log.info["compression: Check .cmp.statstab for info on each file."];} + .log.info["compression: Check getstatstab[] for info on each file."];} compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; diff --git a/di/compression/init.q b/di/compression/init.q index aaf91b2..759381a 100644 --- a/di/compression/init.q +++ b/di/compression/init.q @@ -1,25 +1,16 @@ / Load core functionality into root module namespace \l ::compression.q -/ Load KX log module +/ Load KX log module - needed for .log.info and .log.error .logger:use`kx.log .log:.logger.createLog[] -.log.addfmt[`custom;"$l: $p PID[$i] $m\n"]; / Simplified logging - DELETE +.log.addfmt[`custom;"$l: $p PID[$i] $m\n"]; .log.setfmt[`custom] export:([ - checkcsv:checkcsv; - loadcsv:loadcsv; - hdbstructure:hdbstructure; showcomp:showcomp; - compressfromtable:compressfromtable; - statstabupdate:statstabupdate; - singlethreadcompress:singlethreadcompress; - multithreadcompress:multithreadcompress; + getcompressioncsv:getcompressioncsv; compressmaxage:compressmaxage; docompression:docompression; - summarystats:summarystats; - compress:compress; - cleancompressed:cleancompressed; - hashfilecheck:hashfilecheck + getstatstab:getstatstab ]) From b525002577380aa8727d7f5ccdaf94ece98b618d Mon Sep 17 00:00:00 2001 From: Boyce-pj Date: Tue, 6 Jan 2026 22:09:16 +0000 Subject: [PATCH 3/3] Addressing some comments --- di/compression/compression.md | 2 + di/compression/compression.q | 354 ++++++++++++++++------------------ di/compression/init.q | 10 +- 3 files changed, 164 insertions(+), 202 deletions(-) diff --git a/di/compression/compression.md b/di/compression/compression.md index d9108b9..de89626 100644 --- a/di/compression/compression.md +++ b/di/compression/compression.md @@ -40,11 +40,13 @@ loadcsv or showcomp functions accept `:/path/to/csv argument to specify driver c ## :label: Example Config +``` table,minage,column,calgo,cblocksize,clevel default,10,default, 2, 17,6 quotes, 10,time, 2, 17, 5 quotes,10,src,2,17,4 depth, 10,default, 1, 17, 8 +``` - tables in the db but not in the config tab are automatically compressed using default params - tabs with cols specified will have other columns compressed with default (if default specified for cols of tab, all cols are comp in that tab) diff --git a/di/compression/compression.q b/di/compression/compression.q index 5e20bde..edc1454 100644 --- a/di/compression/compression.q +++ b/di/compression/compression.q @@ -1,224 +1,192 @@ -/ -Data Intellect (info@dataintellect.com) - -USAGE OF COMPRESSION: - -NOTE: Please use with caution. -To SHOW a table of files to be compressed and how before execution, use: - --with a specified csv driver file: -.cmp.showcomp[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] - -OR --with compressionconfig.csv file located in the config folder (TORQ/src/config/compressionconfig.csv): -.cmp.showcomp[`:/path/to/hdb;.cmp.inputcsv; maxagefilestocompress] - -To then COMPRESS all files: - -.cmp.compressmaxage[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] -OR -.cmp.compressmaxage[`:/path/to/hdb;.cmp.inputcsv; maxagefilestocompress] - -If you don't care about the maximum age of the files and just want to COMPRESS up to the oldest files in the db then use: - -.cmp.docompression[`:/path/to/hdb;`:/path/to/csv] -OR -.cmp.docompression[`:/path/to/hdb;.cmp.inputcsv] - -csv should have the following format: - -table,minage,column,calgo,cblocksize,clevel -default,10,default, 2, 17,6 -quotes, 10,time, 2, 17, 5 -quotes,10,src,2,17,4 -depth, 10,default, 1, 17, 8 - --tables in the db but not in the config tab are automatically compressed using default params --tabs with cols specified will have other columns compressed with default (if default specified for cols of tab, all cols are comp in that tab) --algo 0 decompresses the file, or if not compressed ignores --config file could just be one row to compress everything older than age with the same params: - -table,minage,column,calgo,cblocksize,clevel -default,10,default,2,17,6 - -The gzip algo (2) is not necessarily included on windows and unix systems. -See: code.kx.com/wiki/Cookbook/FileCompression for more details - -For WINDOWS users: - -The minimum block size for compression on windows is 16. - -\ - checkcsv:{[csvtab;path] - // include snappy (3) for version 3.4 or after - allowedalgos:0 1 2,$[.z.K>=3.4;3;()]; - if[0b~all colscheck:`table`minage`column`calgo`cblocksize`clevel in (cols csvtab); - .log.error[err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err]; - if[count checkalgo:exec i from csvtab where not calgo in allowedalgos; - .log.error[err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err]; - if[count checkblock:exec i from csvtab where calgo in 1 2, not cblocksize in 12 + til 9; - .log.error[err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err]; - if[count checklevel: exec i from csvtab where calgo in 2, not clevel in til 10; - .log.error[err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err]; - if[.z.o like "w*"; if[count rowwin:where ((csvtab[`cblocksize] < 16) & csvtab[`calgo] > 0); - .log.error[err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]]; - if[(any nulls: any null (csvtab[`column];csvtab[`table];csvtab[`minage];csvtab[`clevel]))>0; - .log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err]; - } - -compressioncsv:([] table:`$();minage:`int$();column:`$();calgo:`int$();cblocksize:`int$();clevel:`int$()) + / validate config csv loaded by loadcsv function + / include snappy (3) for version 3.4 or after + allowedalgos:0 1 2,$[.z.K>=3.4;3;()]; + if[0b~all colscheck:`table`minage`column`calgo`cblocksize`clevel in (cols csvtab); + .log.error[err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err]; + if[count checkalgo:exec i from csvtab where not calgo in allowedalgos; + .log.error[err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err]; + if[count checkblock:exec i from csvtab where calgo in 1 2, not cblocksize in 12 + til 9; + .log.error[err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err]; + if[count checklevel: exec i from csvtab where calgo in 2, not clevel in til 10; + .log.error[err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err]; + if[.z.o like "w*"; if[count rowwin:where ((csvtab[`cblocksize] < 16) & csvtab[`calgo] > 0); + .log.error[err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]]; + if[(any nulls: any null (csvtab[`column];csvtab[`table];csvtab[`minage];csvtab[`clevel]))>0; + .log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err]; + }; + +/ Empty compressioncsv table defined for edge case where a bad config is loaded first attempt +compressioncsv:([] table:`$();minage:`int$();column:`$();calgo:`int$();cblocksize:`int$();clevel:`int$()); loadcsv:{[inputcsv] - loadedcsv:@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; - res:.[checkcsv;(loadedcsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];:0b}]; - if[res~0b;:(::)]; - compressioncsv::loadedcsv; - } + / accepts hsym path as argument + / loads and checks compression config + loadedcsv:@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; + res:.[checkcsv;(loadedcsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];:0b}]; + if[res~0b;:(::)]; + compressioncsv::loadedcsv; + }; -getcompressioncsv:{[] .z.m.compressioncsv} +getcompressioncsv:{[] .z.m.compressioncsv}; -traverse:{$[(0=count k)or x~k:key x; x; .z.s each ` sv' x,/:k where not any k like/:(".d";"*.q";"*.k";"*#")]} +traverse:{$[(0=count k)or x~k:key x; x; .z.s each ` sv' x,/:k where not any k like/:(".d";"*.q";"*.k";"*#")]}; hdbstructure:{ - t:([]fullpath:(raze/)traverse x); // orig traverse - // calculate the length of the input path - base:count "/" vs string x; - // split out the full path - t:update splitcount:count each split from update split:"/" vs' string fullpath,column:`,table:`,partition:(count t)#enlist"" from t; - // partitioned tables - t:update partition:split[;base],table:`$split[;base+1],column:`$split[;base+2] from t where splitcount=base+3; - // splayed - t:update table:`$split[;base],column:`$split[;base+1] from t where splitcount=base+2; - // cast the partition type - t:update partition:{$[not all null r:"D"$'x;r;not all null r:"M"$'x;r;"I"$'x]}[partition] from t; - /- work out the age of each partition - $[14h=type t`partition; t:update age:.z.D - partition from t; - 13h=type t`partition; t:update age:(`month$.z.D) - partition from t; - // otherwise it is ints. If all the values are within 1000 and 3000 - // then assume it is years - t:update age:{$[all x within 1000 3000; x - `year$.z.D;(count x)#0Ni]}[partition] from t]; - delete splitcount,split from t} + t:([]fullpath:(raze/)traverse x); // orig traverse + / calculate the length of the input path + base:count "/" vs string x; + / split out the full path + t:update splitcount:count each split from update split:"/" vs' string fullpath,column:`,table:`,partition:(count t)#enlist"" from t; + / partitioned tables + t:update partition:split[;base],table:`$split[;base+1],column:`$split[;base+2] from t where splitcount=base+3; + / splayed + t:update table:`$split[;base],column:`$split[;base+1] from t where splitcount=base+2; + / cast the partition type + t:update partition:{$[not all null r:"D"$'x;r;not all null r:"M"$'x;r;"I"$'x]}[partition] from t; + / work out the age of each partition + $[14h=type t`partition; t:update age:.z.D - partition from t; + 13h=type t`partition; t:update age:(`month$.z.D) - partition from t; + / otherwise it is ints. If all the values are within 1000 and 3000 + / then assume it is years + t:update age:{$[all x within 1000 3000; x - `year$.z.D;(count x)#0Ni]}[partition] from t]; + delete splitcount,split from t + }; showcomp:{[hdbpath;csvpath;maxage] - /-load csv - loadcsv[$[10h = type csvpath;hsym `$csvpath;hsym csvpath]]; - .log.info["compression: scanning hdb directory structure"]; - /-build paths table and fill age - $[count key (` sv hdbpath,`$"par.txt"); - pathstab:update 0W^age from (,/) hdbstructure'[hsym each `$(read0 ` sv hdbpath,`$"par.txt")]; - pathstab:update 0W^age from hdbstructure[hsym hdbpath]]; - /-delete anything which isn't a table - pathstab:delete from pathstab where table in `; - /-tables that are in the hdb but not specified in the csv - compress with `default params - comptab:2!delete minage from update compressage:minage from .z.m.compressioncsv; - /-specified columns and tables - a:select from comptab where not table=`default, not column=`default; - /-default columns, specified tables - b:select from comptab where not table=`default,column=`default; - /-defaults - c:select from comptab where table = `default, column =`default; - /-join on defaults to entire table - t: pathstab,'(count pathstab)#value c; - /- join on for specified tables - t: t lj 1!delete column from b; - /- join on table and column specified information - t: t lj a; - /- in case of no default specified, delete from the table where no data is joined on - t: delete from t where calgo=0Nj,cblocksize=0Nj,clevel=0Nj; - .log.info["compression: getting current size of each file up to a maximum age of ",string maxage]; - update currentsize:hcount each fullpath from select from t where age within (compressage;maxage) } + / load config from csvpath and display summary of files to be compressed and how + / load csv + loadcsv[$[10h = type csvpath;hsym `$csvpath;hsym csvpath]]; + .log.info["compression: scanning hdb directory structure"]; + / build paths table and fill age + $[count key (` sv hdbpath,`$"par.txt"); + pathstab:update 0W^age from (,/) hdbstructure'[hsym each `$(read0 ` sv hdbpath,`$"par.txt")]; + pathstab:update 0W^age from hdbstructure[hsym hdbpath]]; + / delete anything which isn't a table + pathstab:delete from pathstab where table in `; + / tables that are in the hdb but not specified in the csv - compress with `default params + comptab:2!delete minage from update compressage:minage from .z.m.compressioncsv; + / specified columns and tables + a:select from comptab where not table=`default, not column=`default; + / default columns, specified tables + b:select from comptab where not table=`default,column=`default; + / defaults + c:select from comptab where table = `default, column =`default; + / join on defaults to entire table + t: pathstab,'(count pathstab)#value c; + / join on for specified tables + t: t lj 1!delete column from b; + / join on table and column specified information + t: t lj a; + / in case of no default specified, delete from the table where no data is joined on + t: delete from t where calgo=0Nj,cblocksize=0Nj,clevel=0Nj; + .log.info["compression: getting current size of each file up to a maximum age of ",string maxage]; + update currentsize:hcount each fullpath from select from t where age within (compressage;maxage) + }; compressfromtable:{[table] - statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); - / Check if process is single threaded - if multi then compress in parallel then clean up after - // Add metrics on any files due to be compressed to be used afterwards for comparison - table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; - $[0= system"s"; - singlethreadcompress[table]; - multithreadcompress[table]]; - / Update the stats tab table after the compression - {statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table} + statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); + / Check if process is single threaded - if multi then compress in parallel then clean up after + / Add metrics on any files due to be compressed to be used afterwards for comparison + table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; + $[0= system"s"; + singlethreadcompress[table]; + multithreadcompress[table]]; + / Update the stats tab table after the compression + {statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table + }; statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre] - if[not compressionvaluepre ~ (-21!file)`compressedLength; - statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]} + if[not compressionvaluepre ~ (-21!file)`compressedLength; + statstab,: + $[not 0=algo; + (file;algo;(-21!file)`compressedLength;sizeuncomp); + (file;algo;compressionvaluepre;sizeuncomp)] + ] + }; singlethreadcompress:{[table] - .log.info["compression: Single threaded process, compress applied sequentially"]; - {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; - cleancompressed[x `fullpath;x `calgo]} each table; - } + .log.info["compression: Single threaded process, compress applied sequentially"]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; + cleancompressed[x `fullpath;x `calgo]} each table; + }; multithreadcompress:{[table] - .log.info["compression: Multithreaded process, compress applied in parallel "]; - {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; - {cleancompressed[x `fullpath;x `calgo]} each table; - } + .log.info["compression: Multithreaded process, compress applied in parallel "]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; + {cleancompressed[x `fullpath;x `calgo]} each table; + }; -/- call the compression with a max age paramter implemented compressmaxage:{[hdbpath;csvpath;maxage] - compressfromtable[showcomp[hdbpath;csvpath;maxage]]; - summarystats[]; - } + / call the compression with a max age paramter implemented + compressfromtable[showcomp[hdbpath;csvpath;maxage]]; + summarystats[]; + }; +/ compression without a max age docompression:compressmaxage[;;0W]; -getstatstab:{[] .z.m.statstab} +/ getter for post compression summary table +getstatstab:{[] .z.m.statstab}; summarystats:{ - /- table with compressionratio for each file - statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from statstab); - compressedfiles: select from statstab where not algo = 0; - uncompressedfiles:select from statstab where algo = 0; - /- summarytable - memorysavings: ((sum compressedfiles`uncompressedLength) - sum compressedfiles`compressedLength) % 2 xexp 20; - totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength; - memoryusage:((sum uncompressedfiles`uncompressedLength) - sum uncompressedfiles`compressedLength) % 2 xexp 20; - totaldecompratio: neg (sum uncompressedfiles`compressedLength) % sum uncompressedfiles`uncompressedLength; - .log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."]; - .log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."]; - .log.info["compression: Check getstatstab[] for info on each file."];} + /- table with compressionratio for each file + statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from statstab); + compressedfiles: select from statstab where not algo = 0; + uncompressedfiles:select from statstab where algo = 0; + /- summarytable + memorysavings: ((sum compressedfiles`uncompressedLength) - sum compressedfiles`compressedLength) % 2 xexp 20; + totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength; + memoryusage:((sum uncompressedfiles`uncompressedLength) - sum uncompressedfiles`compressedLength) % 2 xexp 20; + totaldecompratio: neg (sum uncompressedfiles`compressedLength) % sum uncompressedfiles`uncompressedLength; + .log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."]; + .log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."]; + .log.info["compression: Check getstatstab[] for info on each file."]; + }; compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] - compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; - / compress or decompress as appropriate: - cmp:$[algo=0;"de";""]; - $[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo); - [.log.info["compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."]; - / perform the compression/decompression - -19!(filetoCompress;compressedFile;blocksize;algo;level); - ]; - / if already compressed/decompressed, then log that and skip. - .log.info["compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]]} + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + / compress or decompress as appropriate: + cmp:$[algo=0;"de";""]; + $[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo); + [.log.info["compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."]; + / perform the compression/decompression + -19!(filetoCompress;compressedFile;blocksize;algo;level); + ]; + / if already compressed/decompressed, then log that and skip. + .log.info["compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"] + ] + }; cleancompressed:{[filetoCompress;algo] - compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; - cmp:$[algo=0;"de";""]; - // Verify compressed file exists - if[()~ key compressedFile; - .log.info["compression: No compressed file present for the following file - ",string[filetoCompress]]; - :(); - ]; - // Verify compressed file's contents match original - if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; - .log.info["compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"]; - hdel compressedFile; - :(); - ]; - // Given above two checks satisfied run the delete of old and rename compressed to original name - .log.info["compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; - system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; - / move the hash files too. - hashfilecheck[compressedFile;filetoCompress;sf]; - } + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + cmp:$[algo=0;"de";""]; + / Verify compressed file exists + if[()~ key compressedFile; + .log.info["compression: No compressed file present for the following file - ",string[filetoCompress]]; + :(); + ]; + / Verify compressed file's contents match original + if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; + .log.info["compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"]; + hdel compressedFile; + :(); + ]; + / Given above two checks satisfied run the delete of old and rename compressed to original name + .log.info["compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; + system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; + / move the hash files too. + hashfilecheck[compressedFile;filetoCompress;sf]; + }; hashfilecheck:{[compressedFile;filetoCompress;sf] - / if running 3.6 or higher, account for anymap type for nested lists - / check for double hash file if nested data contains symbol vector/atom - $[3.6<=.z.K; - if[77 = type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"; - .[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);.log.info["compression: File does not have enumeration domain"]]]; - / if running below 3.6, nested list types will be 77h+t and will not have double hash file - if[78 <= type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"]]} + / if running 3.6 or higher, account for anymap type for nested lists + / check for double hash file if nested data contains symbol vector/atom + $[3.6<=.z.K; + if[77 = type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"; + .[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);.log.info["compression: File does not have enumeration domain"]]]; + / if running below 3.6, nested list types will be 77h+t and will not have double hash file + if[78 <= type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"] + ] + }; diff --git a/di/compression/init.q b/di/compression/init.q index 759381a..4289c5e 100644 --- a/di/compression/init.q +++ b/di/compression/init.q @@ -4,13 +4,5 @@ / Load KX log module - needed for .log.info and .log.error .logger:use`kx.log .log:.logger.createLog[] -.log.addfmt[`custom;"$l: $p PID[$i] $m\n"]; -.log.setfmt[`custom] -export:([ - showcomp:showcomp; - getcompressioncsv:getcompressioncsv; - compressmaxage:compressmaxage; - docompression:docompression; - getstatstab:getstatstab - ]) +export:([showcomp;getcompressioncsv;compressmaxage;docompression;getstatstab])