diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md new file mode 100644 index 0000000..05ec50b --- /dev/null +++ b/dataloader/dataloader.md @@ -0,0 +1,138 @@ +# `dataloader.q` – Loading delimited data and creating databases for kdb+ + + +This package is used for automated customisable dataloading and database creation and is a generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles. +Package employs a chunk-based loading strategy to minimize memory usage when processing large datasets. Rather than loading entire datasets into memory before writing to disk, data is processed incrementally in manageable chunks. + +The memory footprint is determined by the maximum of: +- Memory required to load and save one data chunk +- Memory required to sort the final resultant table + +This approach enables processing of large data volumes with a relatively small memory footprint. Example use cases include: +- Large File Processing: Loading very large files by processing them in small, sequential chunks +- Cross-Partition Loading: Efficiently handling distributed data across multiple small files (e.g., separate monthly files for AAPL and MSFT data) + +The chunked architecture ensures scalable performance regardless of total dataset size, making it suitable for processing datasets that would otherwise exceed available system memory. +When all the data is written, the on-disk data is re-sorted and the attributes are applied. + +--- + +## :sparkles: Features + +- Load delimited data from disk directory in customisable chunks +- Persist data to disk in partitioned format +- Dynamically sort and apply attributes to tables in the resulting database +- Configure compression of database + +--- + +## :gear: Initialisation + +After loading the package into the session, the `loadallfiles` function is ready to run. By default, tables will be written with the `p` attribute applied to the `sym` column and sorted. + +### :mag_right: Custom sorting parameters + +By default, the sorting paramters for all tables are: +``` +tabname att column sort +----------------------- +default p sym 1 +``` + +That is, for every table the `p` attribute will be applied to the `sym` column and sorted. If the table being loaded requires different attributes applied on different columns, custom sorting parameters can be added using the `addsortparams` function. This takes 4 inputs: tabname, att, column, and sort. These arguments are used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. Furthermore, this will add (or update existing) parameters for the specified table. + +You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters. By passing in `default` this will overwrite the current default paramters. + +If no sorting or attributes are required pass in the dictionary with a `tabname` with `default`, `att` and `column` with backticks and `sort` with `0b`, examples shown below: +```q +dataloader:use`dataloader +dataloader.addsortparams[`tabname`att`column`sort!(`default;`;`;0b)] / Overwrite default to apply no sorting or attributes +dataloader.addsortparams[`tabname`att`column`sort!(`default;`p;`sym;1b)] / Overwrite default to sort all tables loaded in by the sym column and apply the parted attribute +dataloader.addsortparams[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] / Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes +``` +The dictionary arguments are outlined below. + +| Input | Type | Description | +|-----------|-----------------------|----------------------------------------------------------------------------------| +| `tabname` | symbol/ symbol list | Name of table | +| `att` | symbol/ symbol list | Attributes corresponding to the table names | +| `column` | symbol/ symbol list | Columns to sort and apply attributes to | +| `sort` | boolean /boolean list | Determines if the corresponding table will be sorted (1b: sorted; 0b:not sorted) | + +--- + +### :rocket: Functions + +`loadallfiles` is the primary function used to load in all data and create the database. The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes. + + +## :mag_right: Params in depth +The dictionary should/can have the following fields: + +| Parameter | Required | Type | Description | +|-------------------|----------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `headers` | Y | symbol | Names of the header columns in the file | +| `types` | Y | char list | Data types to read from the file | +| `separator` | Y | char list | Delimiting character. Enlist it if first line of file is header data | +| `tablename` | Y | symbol | Name of table to write data to | +| `dbdir` | Y | symbol | Directory to write data to | +| `symdir` | N | symbol | Directory to enumerate against | +| `enumname` | N | symbol | Name of symfile to enumerate against. Default is `sym | +| `partitiontype` | N | symbol | Partitioning to use. Must be one of `date, month, year, int`. Default is `date` | +| `partitioncol` | N | symbol | Column to use to extract partition information.Default is `time` | +| `dataprocessfunc` | N | function | Diadic function to process data after it has been read in. First argument is load parameters dictionary, second argument is data which has been read in. Default is `{[x;y] y}` | +| `chunksize` | N | int | Data size in bytes to read in one chunk. Default is `100 MB` | +| `compression` | N | int list | Compression parameters to use e.g. `17 2 6`. Default is empty list for no compression | +| `gc` | N | boolean | Whether to run garbage collection at appropriate points. Default is `0b` (false) | +| `filepattern` | N | char list | Pattern used to only load certain files e.g. `".csv"`,`("*.csv","*.txt")` | + +The second parameter is a directory handle .e.g +```q +`:dir +``` + +--- + +### :test_tube: Example + +```q +dataloader:use`dataloader + +// If using custom sorting parameters, check they are as expected +dataloader.sortparams[] +tabname att column sort +----------------------- +default p sym 1 + +// Read in data and create db +dataloader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb);`:TRADE/toload] + +//load in db +\l hdb + +//check table and sorting +select from trade + +date sym time price volume mktflag cond exclude +------------------------------------------------------------------------------- +2025.07.15 AAPL 2025.07.15D01:17:08.000000000 266 3980 B 10 1 +2025.07.15 AAPL 2025.07.15D01:44:42.000000000 278 31 B 12 1 +2025.07.15 AAPL 2025.07.15D02:05:37.000000000 34 8699 S 21 0 +2025.07.15 AAPL 2025.07.15D02:06:02.000000000 97 1769 B 29 1 +2025.07.15 AAPL 2025.07.15D02:14:24.000000000 106 8138 B 2 1 +2025.07.15 AAPL 2025.07.15D02:40:33.000000000 61 2611 B 36 1 +2025.07.15 AAPL 2025.07.15D03:29:37.000000000 31 4240 B 15 1 + +// Ensure attributes are applied +meta trade +c | t f a +-------| ----- +date | d +sym | s p +time | p +price | f +volume | i +mktflag| c +cond | h +exclude| b +``` diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q new file mode 100644 index 0000000..903c970 --- /dev/null +++ b/dataloader/dataloader.q @@ -0,0 +1,59 @@ +/ generic dataloader library + +/ loads data in from delimited file, applies processing function, enumerates and writes to db. NOTE: it is not trivial to check user has inputted headers correctly, assume they have +loaddata:{[loadparams;rawdata] + / check if first row matches headers provided + data:$[(`$"," vs rawdata 0)~loadparams`headers; + (loadparams`types`separator)0:rawdata; + flip loadparams[`headers]!(loadparams`types`separator)0:rawdata + ]; + if[not loadparams[`filename]in filesread;filesread,:loadparams`filename]; + data:0!loadparams[`dataprocessfunc].(loadparams;data); + / if enumname provided, use it, otherwise default to `sym + domain:(`sym;loadparams`enumname)`enumname in key loadparams; + data:.Q.ens[loadparams(`dbdir`symdir)`symdir in key loadparams;data;domain]; + wd:writedatapartition[data]. loadparams`dbdir`partitiontype`partitioncol`tablename; + / run the writedatapartition function for each partition + wd each distinct loadparams[`partitiontype]$data loadparams`partitioncol; + if[loadparams`gc;.Q.gc[]]; + }; + +/ write data for provdided database and partition +writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] + towrite:data where partition=partitiontype$data partitioncol; + writepath:` sv .Q.par[dbdir;partition;tablename],`; + .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; + .z.m.partitions[writepath]:(tablename;partition); + }; + +/ adds compression, sorting and attributes selected +finish:{[loadparams] + / temporarily set compression defaults + if[count loadparams`compression;.z.zd:loadparams`compression]; + {.z.m.util.sorttab[.z.m.sp](x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; + system"x .z.zd"; + if[loadparams`gc;.Q.gc[]]; + }; + +/ load all the files from a specified directory +loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] + .z.m.partitions:()!(); + .z.m.filesread:(); + / get the contents of the directory based on optional filepattern + filelist:$[`filepattern in key loadparams; + key[dir:hsym dir]where key[dir]like first loadparams`filepattern; + key dir:hsym dir]; + filelist:` sv'dir,'filelist; + {[loadparams;file].Q.fsn[loaddata loadparams,(enlist`filename)!enlist file;file;loadparams`chunksize]}[loadparams]each filelist; + .z.m.finish loadparams; + }; + +/ set default sorting parameters +sp:flip`tabname`att`column`sort!(1#`default;`p;`sym;1b); +sortparams:{[].z.m.sp}; + +/ add custom sorting parameters to the sortparams table +addsortparams:{[tabname;att;column;sort] + x:flip(flip sortparams[]),'(tabname;att;column;sort); + .z.m.sp:select from x where i=(last;i)fby tabname; + }; diff --git a/dataloader/init.q b/dataloader/init.q new file mode 100644 index 0000000..7cd5af9 --- /dev/null +++ b/dataloader/init.q @@ -0,0 +1,6 @@ +/ load core dataloader functions +\l ::dataloader.q +/ load util submodule +util:use`dataloader.util +/ expose public function +export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) diff --git a/dataloader/test.csv b/dataloader/test.csv new file mode 100644 index 0000000..9c967dc --- /dev/null +++ b/dataloader/test.csv @@ -0,0 +1,89 @@ +run,0,0,q,dataloader:use`dataloader,1,,Load in dataloader package +run,0,0,q,os:use`os,1,,Load in os package for system commands +run,0,0,q,os.cd hsym first`$system"echo $HOME/kdbx-packages/dataloader",1,,Cd to dataloader directory +run,0,0,q,{if[os.isdir x;os.deldir x]}`:test,1,,Delete test directory if it already existed +run,0,0,q,os.mkdir each`:test/data/hdb`:test/data/files`:test/data/symdir,1,,Create hdb/files/enum directory +run,0,0,q,hdbpath:hsym`$os.abspath`:test/data/hdb,,1,,Hardcode hdb path +run,0,0,q,filespath:hsym`$os.abspath`:test/data/files,,1,,Hardcode files path +run,0,0,q,symdirpath:hsym`$os.abspath`:test/data/symdir,,1,,Hardcode enum files path + +run,0,0,q,tradetab:([]time:asc 2024.01.15D09:30+10?00:10:00;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;price:(5*10?10000)%100;size:5*10?100;side:10?`B`S;exchange:10?`NYSE`NASDAQ),1,,Create mock trade table +run,0,0,q,quotetab:([]time:asc 2024.01.15D09:30+10?00:10:00;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;bid:(5*10?10000)%100;ask:(5*10?10000)%100;bsize:5*10?100;asize:5*10?100;exchange:10?`NYSE`NASDAQ),1,,Create mock quote table +run,0,0,q,"tradedailytab:([]date:asc 2024.02.01,2024.01.15+9?10;time:{x+10?x}09:00:00.000;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;price:(5*10?10000)%100;size:5*10?100;exchange:10?`NYSE`NASDAQ)",1,,Create mock tradedaily table + +run,0,0,q,"(` sv filespath,`tradenh.csv)0:1_csv 0:tradetab",1,,Save trade csv without headers +run,0,0,q,"(` sv filespath,`trade.csv)0:csv 0:tradetab",1,,Save trade csv with headers +run,0,0,q,"(` sv filespath,`tradedaily.csv)0:csv 0:tradedailytab",1,,Save tradedaily csv +run,0,0,q,"(` sv filespath,`quotenh.csv)0:1_csv 0:quotetab",1,,Save quote csv without headers +run,0,0,q,"(` sv filespath,`quote.csv)0:csv 0:quotetab",1,,Save quote csv with headers + +run,0,0,q,dataloader.addsortparams[`quote`tradedaily;`s`;`time`;10b],1,,Add custom sort parameters + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";csv;`trade;hdbpath;"tradenh.csv");filespath],1,,Test load of trade file with no headers +run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`trade in tables[],1,,Test table loaded to db +true,0,0,q,`p=meta[trade][`sym;`a],1,,Check sym column is parted +true,0,0,q,`2024.01.15 in key`:.,1,,Check table saved to correct partition +true,0,0,q,`sym in key `:.,1,,Check sym file created and enumerated against + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;"trade.csv");filespath],1,,Test load of trade file with headers +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`trade in tables[],1,,Test table loaded to db +true,0,0,q,`p=meta[trade][`sym;`a],1,,Check sym column is parted +true,0,0,q,`2024.01.15 in key`:.,1,,Check table saved to correct partition +true,0,0,q,`sym in key `:.,1,,Check sym file created and enumerated against + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`symdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;symdirpath;"trade.csv");filespath],1,,Test load of trade file with specfied symdir +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`trade in tables[],1,,Test table loaded to db +true,0,0,q,os.exists`:../symdir,1,,Test sym file enumerated to custom directory + +run,0,0,q,os.del`:sym,1,,Remove sym +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`enumname`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;`mocksym;"trade.csv");filespath],1,,Test load of trade file with specfied symfile name +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`mocksym in key`:.,1,,Check that db sym file has custom name + +run,0,0,q,os.del`:mocksym,1,,Remove mocksym +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`partitiontype`partitioncol`filepattern!(`date`time`sym`price`size`exchange;"DTSFIS";1#csv;`tradedaily;hdbpath;`month;`date;"tradedaily.csv");filespath],1,,Test custom partition overwrites +run,0,0,q,\l .,1,,Reload database +true,0,0,q,all`2024.01`2024.02 in key`:.,1,,Check partitions were created as month and all are there +true,0,0,q,all null exec a from 0!meta tradedaily,1,,Check no attributes applied to tradedaily table +run,0,0,q,os.deldir each`:2024.01`:2024.02,1,,Remove month partitions + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`chunksize`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";csv;`quote;hdbpath;200i;"quotenh.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,reload database +true,0,0,q,`quote in tables[],1,,Test quote table loaded to db in chunks +true,0,0,q,10=count quote,1,,Test all of quote file loaded in chunks + +run,0,0,q,os.deldir`:2024.01.15,1,,Delete directory to handle new column +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";1#csv;`quote;hdbpath;{[loaderparams;data]update mid:avg(bid;ask)from data};"quote.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`quote in tables[],1,,Check that table loaded to db +true,0,0,q,`mid in cols quote,1,,Check that column from update function is included in save down + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`compression`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;(17i;2i;6i);"trade.csv");filespath],1,,Test loading with compression specified +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`trade in tables[],1,,Test trade table loaded to db +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/exchange)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for exchange column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/price)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for price column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/side)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for side column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/size)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for size column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/sym)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for sym column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/time)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for time column + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`gc`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;1b;"trade.csv");filespath],1,,Test loading with garbage collection enabled +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`trade in tables[],1,,Test trade table loaded to db + +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;"trade.csv");filespath],1,,Test load of trade file +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";1#csv;`quote;hdbpath;"quote.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`s=meta[quote][`time;`a],1,,Check quote table sorted on time +true,0,0,q,`trade in tables[],1,,Test trade table loaded to db +true,0,0,q,`quote in tables[],1,,Test quote table loaded to db + +run,0,0,q,os.cd hsym first`$system"echo $HOME/kdbx-packages/dataloader",1,,Cd to dataloader directory +run,0,0,q,os.deldir`:test/,1,,Delete the test directory diff --git a/dataloader/util.q b/dataloader/util.q new file mode 100644 index 0000000..c382e21 --- /dev/null +++ b/dataloader/util.q @@ -0,0 +1,48 @@ +/ function to apply attribute to a column +applyattr:{[dloc;colname;att] + .[{@[x;y;z#]};(dloc;colname;att); + {[dloc;colname;att;e] + '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; + }[dloc;colname;att] + ] + }; + +/ function used to sort and apply attributes to tables on disk based on format provided at initialisation of package +sorttab:{[sortparams;d] + if[1>sum exec sort from sortparams;:()]; + sp:$[count tabparams:select from sortparams where tabname=d[0]; + tabparams; + count defaultparams:select from sortparams where tabname=`default; + defaultparams + ]; + / loop through each directory and sort the data + {[sp;dloc] + if[count sortcols:exec column from sp where sort,not null column; + .[xasc;(sortcols;dloc);{[sortcols;dloc;e]'"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; + if[count attrcols:select column,att from sp where not null att; + applyattr[dloc]'[attrcols`column;attrcols`att]]; + }[sp]each distinct(),last d; + }; + +/ function checks keys are correct and value have the right types for loadallfiles argument +paramfilter:{[loadparams] + if[not 99h=type loadparams;'"loadallfiles requires a dictionary parameter"]; + req:`headers`types`tablename`dbdir`separator; + if[not all req in key loadparams; + '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; + if[not count loadparams`symdir;loadparams[`symdir]:loadparams`dbdir]; + loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; + reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!11 10 -11 -11 -11 -6 -11 -11 -1h; + if[count w:where not(type each loadparams key reqtypes)=reqtypes; + '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; + if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; + if[not 99h