-
Notifications
You must be signed in to change notification settings - Fork 82
Add parallel compression for multithreaded process #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jonathonmcmurray
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments that are largely unrelated to your actual change but while we're modifying this file may as well clean it up a bit
code/common/compress.q
Outdated
| /-log to the table if the algo wasn't 0 | ||
| statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do this logging in the compressmaxage function rather than the cleancompressed - I think we then can reduce the args to cleancompressed and skip calcing comprL.
code/common/compress.q
Outdated
| statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]]; | ||
| [$[not count -21!compressedFile; | ||
| [.lg.o[`compression; "Failed to compress file ",string[filetoCompress]];hdel compressedFile]; | ||
| [.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];hdel compressedFile]]]]]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the number of ] on this line is horrifying - I think we should be able to refactor this function a bit to avoid the [...] block syntax inside conditionals and make this a bit easier to follow
e.g. something like
if[()~ key compressedFile;
.lg.o[`compression; "No compressed file present for the following file - ",string[filetoCompress];
:();
];
if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0;
.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];
hdel compressedFile;
:();
];
// rest of code to rename file etc.
code/common/compress.q
Outdated
| $[0= system"s"; | ||
| [.lg.o[`compression; "Single threaded process, compress applied sequentially"]; | ||
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table]; | ||
| [.lg.o[`compression; "Multithreaded process, compress applied in parallel "]; | ||
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; | ||
| {cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;]]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be cleaner if we made this into two functions that can be called depending on whether proc is single threaded or not
| $[0= system"s"; | |
| [.lg.o[`compression; "Single threaded process, compress applied sequentially"]; | |
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table]; | |
| [.lg.o[`compression; "Multithreaded process, compress applied in parallel "]; | |
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; | |
| {cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;]]} | |
| $[0= system"s";singlethreadcompress;multithreadcompress]table; | |
| } | |
| singlethreadcompress:{[table] | |
| .lg.o[`compression; "Single threaded process, compress applied sequentially"]; | |
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]; | |
| cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table; | |
| } | |
| multithreadcompress:{[table] | |
| .lg.o[`compression; "Multithreaded process, compress applied in parallel "]; | |
| {compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table; | |
| {cleancompressed[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table; | |
| } |
or something like this
clean the code and make it more modular depending on threads count
Addition of parallel compression where possible in multithreaded processes.
Split the compression function into two components:
Performance Statistics
Compression:
Decompression
Verifications of changes
Context on data
Table 1 - 2025.08.07 Partition - Quote - 2.5m rows - 9 columns
Table 2 - 2025.08.08 Partition - Quote - 2.5m rows - 9 columns
Table 3 - 2025.08.18 Partition - Trade - 0.6m rows - 8 columns
Single Threaded Process Verification
ksomerville@homer:~/repo/TorQ$ q torq.q -proctype test -procname mytest -debug
Verify single threaded
q)\s
0i
View Current compression ratio - returns nothing so currently not compressed
q)-21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
()
Modify Compression File to have compression stats of (2;16;9):
`:/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv
q)read0 `:/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv
Run the compression
q) .cmp.docompression[hdbpath:
:/home/ksomerville/testDB/hdb;csvpath::/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv]Verify Compression has been applied:
q)-21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
Run Decompression
Change the algo in the compression config to 0:
read0 `:/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv
Run decompression:
q).cmp.docompression[hdbpath:
:/home/ksomerville/testDB/hdb;csvpath::/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv]Verify the file is no longer compressed:
q)-21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
()
Multithreaded Process
Start the test processes with a max thread setting of 8:
ksomerville@homer:~/repo/TorQ$ q torq.q -proctype test -procname mytest -debug -s 8
Verify processes is multithreaded
q)\s
8i
Show that the files are uncompressed (not touched from above):
q)-21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
()
Set the Compression algo back to (2;16;9)
q) read0 `:/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv
Run the Compression:
q).cmp.docompression[hdbpath:
:/home/ksomerville/testDB/hdb;csvpath::/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv]Verify compression applied:
q) -21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
Run decompression on the data, change compression algo to 0:
q) read0 `:/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv
Run the Decompression:
q).cmp.docompression[hdbpath:
:/home/ksomerville/testDB/hdb;csvpath::/home/ksomerville/deploy/TorQApp/latest/appconfig/compressionconfig.csv]Verify the data isn'compressed:
q)-21!`:/home/ksomerville/testDB/hdb/2025.08.07/quote/bid
()