diff --git a/di/compression/compression.md b/di/compression/compression.md new file mode 100644 index 0000000..de89626 --- /dev/null +++ b/di/compression/compression.md @@ -0,0 +1,91 @@ +# Compression + +`compression.q` applies compression to any kdb+ database, handles all partition types including date, month, year, int, and can deal with top level splayed tables. It will also decompress files as required. + +> **Note:** Please use with caution + +--- + +## :sparkles: Features + +- Apply compression to any kdb+ database +- Compression uses -19! operator +- Configured using specified csv driver file +- Compression algorithm (none, kdb+ IPC, gzip), compression blocksize, and compression level (for gzip) can be configured +- Flexibility to compress different tables/columns with different compression parameters +- Preview available before compression to display files to be compressed and how +- Summary statistics for each file returned after compression/decompression complete + +--- + +## memo: Dependencies + +- KX log module + +--- + +## :gear: compressioncsv Schema + +Blank compressioncsv is created when module is loaded. +loadcsv or showcomp functions accept `:/path/to/csv argument to specify driver csv location and will load the config to compressioncsv + +| Column | Type | Description | +|------------|---------|--------------------------------------------------------| +| table | `symbol`| table names for specific compression | +| minage | `int` | minimum age of file in days before compression applied | +| column | `symbol`| column name for column specific compression | +| calgo | `int` | compression algorithim - 0, 1, or 2 accepted | +| cblocksize | `int` | compression blocksize - between 12 and 19 | +| clevel | `int` | compression level - 0 to 9 - for gzip (2) only | + +## :label: Example Config + +``` +table,minage,column,calgo,cblocksize,clevel +default,10,default, 2, 17,6 +quotes, 10,time, 2, 17, 5 +quotes,10,src,2,17,4 +depth, 10,default, 1, 17, 8 +``` + +- tables in the db but not in the config tab are automatically compressed using default params +- tabs with cols specified will have other columns compressed with default (if default specified for cols of tab, all cols are comp in that tab) +- algo 0 decompresses the file, or if not compressed ignores +- config file could just be one row to compress everything older than age with the same params: + +--- + +## :wrench: Functions + +| Function | Description | +|--------------------|----------------------------------------------------------------| +|`showcomp` | Load specified compression config and show compression details for files to be compressed | +|`getcompressioncsv` | get function to return loaded compressioncsv config | +|`compressmaxage` | Compress files according to config up to the specified max age | +|`docompression` | Compress all files in hdb according to compressioncsv config | +|`getstatstab` | get function to retrieve summary stats post compression | + +--- + +## :test_tube: Example + +```q +// Include compression module in a process +.cmp: use `di.compression + +// View dictionary of functions +.cmp + +// Show table of files to be compressed and how before execution +.cmp.showcomp[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] + +// COMPRESS all files up to a max age: +.cmp.compressmaxage[`:/path/to/hdb;`:/path/to/csv; maxagefilestocompress] + +// COMPRESS up to the oldest files in the db: +.cmp.docompression[`:/path/to/hdb;`:/path/to/csv] + +// Retrieve summary statistics for compression +.cmp.getstatstab[] + +``` diff --git a/di/compression/compression.q b/di/compression/compression.q new file mode 100644 index 0000000..edc1454 --- /dev/null +++ b/di/compression/compression.q @@ -0,0 +1,192 @@ +checkcsv:{[csvtab;path] + / validate config csv loaded by loadcsv function + / include snappy (3) for version 3.4 or after + allowedalgos:0 1 2,$[.z.K>=3.4;3;()]; + if[0b~all colscheck:`table`minage`column`calgo`cblocksize`clevel in (cols csvtab); + .log.error[err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err]; + if[count checkalgo:exec i from csvtab where not calgo in allowedalgos; + .log.error[err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err]; + if[count checkblock:exec i from csvtab where calgo in 1 2, not cblocksize in 12 + til 9; + .log.error[err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err]; + if[count checklevel: exec i from csvtab where calgo in 2, not clevel in til 10; + .log.error[err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err]; + if[.z.o like "w*"; if[count rowwin:where ((csvtab[`cblocksize] < 16) & csvtab[`calgo] > 0); + .log.error[err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]]; + if[(any nulls: any null (csvtab[`column];csvtab[`table];csvtab[`minage];csvtab[`clevel]))>0; + .log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err]; + }; + +/ Empty compressioncsv table defined for edge case where a bad config is loaded first attempt +compressioncsv:([] table:`$();minage:`int$();column:`$();calgo:`int$();cblocksize:`int$();clevel:`int$()); + +loadcsv:{[inputcsv] + / accepts hsym path as argument + / loads and checks compression config + loadedcsv:@[{.log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {.log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]]; + res:.[checkcsv;(loadedcsv;string inputcsv);{.log.error["failed to load csv due to error: ",x];:0b}]; + if[res~0b;:(::)]; + compressioncsv::loadedcsv; + }; + +getcompressioncsv:{[] .z.m.compressioncsv}; + +traverse:{$[(0=count k)or x~k:key x; x; .z.s each ` sv' x,/:k where not any k like/:(".d";"*.q";"*.k";"*#")]}; + +hdbstructure:{ + t:([]fullpath:(raze/)traverse x); // orig traverse + / calculate the length of the input path + base:count "/" vs string x; + / split out the full path + t:update splitcount:count each split from update split:"/" vs' string fullpath,column:`,table:`,partition:(count t)#enlist"" from t; + / partitioned tables + t:update partition:split[;base],table:`$split[;base+1],column:`$split[;base+2] from t where splitcount=base+3; + / splayed + t:update table:`$split[;base],column:`$split[;base+1] from t where splitcount=base+2; + / cast the partition type + t:update partition:{$[not all null r:"D"$'x;r;not all null r:"M"$'x;r;"I"$'x]}[partition] from t; + / work out the age of each partition + $[14h=type t`partition; t:update age:.z.D - partition from t; + 13h=type t`partition; t:update age:(`month$.z.D) - partition from t; + / otherwise it is ints. If all the values are within 1000 and 3000 + / then assume it is years + t:update age:{$[all x within 1000 3000; x - `year$.z.D;(count x)#0Ni]}[partition] from t]; + delete splitcount,split from t + }; + +showcomp:{[hdbpath;csvpath;maxage] + / load config from csvpath and display summary of files to be compressed and how + / load csv + loadcsv[$[10h = type csvpath;hsym `$csvpath;hsym csvpath]]; + .log.info["compression: scanning hdb directory structure"]; + / build paths table and fill age + $[count key (` sv hdbpath,`$"par.txt"); + pathstab:update 0W^age from (,/) hdbstructure'[hsym each `$(read0 ` sv hdbpath,`$"par.txt")]; + pathstab:update 0W^age from hdbstructure[hsym hdbpath]]; + / delete anything which isn't a table + pathstab:delete from pathstab where table in `; + / tables that are in the hdb but not specified in the csv - compress with `default params + comptab:2!delete minage from update compressage:minage from .z.m.compressioncsv; + / specified columns and tables + a:select from comptab where not table=`default, not column=`default; + / default columns, specified tables + b:select from comptab where not table=`default,column=`default; + / defaults + c:select from comptab where table = `default, column =`default; + / join on defaults to entire table + t: pathstab,'(count pathstab)#value c; + / join on for specified tables + t: t lj 1!delete column from b; + / join on table and column specified information + t: t lj a; + / in case of no default specified, delete from the table where no data is joined on + t: delete from t where calgo=0Nj,cblocksize=0Nj,clevel=0Nj; + .log.info["compression: getting current size of each file up to a maximum age of ",string maxage]; + update currentsize:hcount each fullpath from select from t where age within (compressage;maxage) + }; + +compressfromtable:{[table] + statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$()); + / Check if process is single threaded - if multi then compress in parallel then clean up after + / Add metrics on any files due to be compressed to be used afterwards for comparison + table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table; + $[0= system"s"; + singlethreadcompress[table]; + multithreadcompress[table]]; + / Update the stats tab table after the compression + {statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table + }; + +statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre] + if[not compressionvaluepre ~ (-21!file)`compressedLength; + statstab,: + $[not 0=algo; + (file;algo;(-21!file)`compressedLength;sizeuncomp); + (file;algo;compressionvaluepre;sizeuncomp)] + ] + }; + +singlethreadcompress:{[table] + .log.info["compression: Single threaded process, compress applied sequentially"]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; + cleancompressed[x `fullpath;x `calgo]} each table; + }; + +multithreadcompress:{[table] + .log.info["compression: Multithreaded process, compress applied in parallel "]; + {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; + {cleancompressed[x `fullpath;x `calgo]} each table; + }; + +compressmaxage:{[hdbpath;csvpath;maxage] + / call the compression with a max age paramter implemented + compressfromtable[showcomp[hdbpath;csvpath;maxage]]; + summarystats[]; + }; + +/ compression without a max age +docompression:compressmaxage[;;0W]; + +/ getter for post compression summary table +getstatstab:{[] .z.m.statstab}; + +summarystats:{ + /- table with compressionratio for each file + statstab::`compressionratio xdesc (update compressionratio:?[algo=0; neg uncompressedLength%compressedLength; uncompressedLength%compressedLength] from statstab); + compressedfiles: select from statstab where not algo = 0; + uncompressedfiles:select from statstab where algo = 0; + /- summarytable + memorysavings: ((sum compressedfiles`uncompressedLength) - sum compressedfiles`compressedLength) % 2 xexp 20; + totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength; + memoryusage:((sum uncompressedfiles`uncompressedLength) - sum uncompressedfiles`compressedLength) % 2 xexp 20; + totaldecompratio: neg (sum uncompressedfiles`compressedLength) % sum uncompressedfiles`uncompressedLength; + .log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."]; + .log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."]; + .log.info["compression: Check getstatstab[] for info on each file."]; + }; + +compress:{[filetoCompress;algo;blocksize;level;sizeuncomp] + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + / compress or decompress as appropriate: + cmp:$[algo=0;"de";""]; + $[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo); + [.log.info["compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."]; + / perform the compression/decompression + -19!(filetoCompress;compressedFile;blocksize;algo;level); + ]; + / if already compressed/decompressed, then log that and skip. + .log.info["compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"] + ] + }; + +cleancompressed:{[filetoCompress;algo] + compressedFile: hsym `$(string filetoCompress),"_kdbtempzip"; + cmp:$[algo=0;"de";""]; + / Verify compressed file exists + if[()~ key compressedFile; + .log.info["compression: No compressed file present for the following file - ",string[filetoCompress]]; + :(); + ]; + / Verify compressed file's contents match original + if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0; + .log.info["compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"]; + hdel compressedFile; + :(); + ]; + / Given above two checks satisfied run the delete of old and rename compressed to original name + .log.info["compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."]; + system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress; + / move the hash files too. + hashfilecheck[compressedFile;filetoCompress;sf]; + }; + + +hashfilecheck:{[compressedFile;filetoCompress;sf] + / if running 3.6 or higher, account for anymap type for nested lists + / check for double hash file if nested data contains symbol vector/atom + $[3.6<=.z.K; + if[77 = type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"; + .[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);.log.info["compression: File does not have enumeration domain"]]]; + / if running below 3.6, nested list types will be 77h+t and will not have double hash file + if[78 <= type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"] + ] + }; diff --git a/di/compression/init.q b/di/compression/init.q new file mode 100644 index 0000000..4289c5e --- /dev/null +++ b/di/compression/init.q @@ -0,0 +1,8 @@ +/ Load core functionality into root module namespace +\l ::compression.q + +/ Load KX log module - needed for .log.info and .log.error +.logger:use`kx.log +.log:.logger.createLog[] + +export:([showcomp;getcompressioncsv;compressmaxage;docompression;getstatstab]) diff --git a/di/compression/test.csv b/di/compression/test.csv new file mode 100644 index 0000000..e69de29