From 6b4b5ad5b5b804aa4e0644480b4da7e3b5817504 Mon Sep 17 00:00:00 2001 From: DHopkinson-DI Date: Mon, 21 Jul 2025 15:39:41 +0100 Subject: [PATCH 01/17] add dataloader.q and dataloader.md --- dataloader/dataloader.md | 125 ++++++++++++++++++++++++++++++++++++++ dataloader/dataloader.q | 127 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 dataloader/dataloader.md create mode 100644 dataloader/dataloader.q diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md new file mode 100644 index 0000000..f646094 --- /dev/null +++ b/dataloader/dataloader.md @@ -0,0 +1,125 @@ +# `dataloader.q` – Loading delimited data and creating databases for kdb+ + + +This package is used for automated customisable dataloading and database creation and is a generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles. +Load all delimeted files in a directory into memory in configurable chunk sizes then output the resulting tables to disk in kdb+ partiioned format. +When all the data is written, the on-disk data is re-sorted and the attributes are applied. + +--- + +## :sparkles: Features + +- Load delimited data from disk directory in customisable chunks +- Persist data to disk in partitioned format +- Dynamically sort and apply attributes to tables in the resulting database +- Configure compression of database + +--- + +## :gear: Iniitialisation + +After loading the package into the session the unary function `.loader.init` is called with a single argument to initialise the packages global variables and define any configurable sorting/attributes. + +### :mag_right: Params in Depth + +The sole argument to `.loader.init` should be a dictionary with the keys `tabname, att, column and sort`. The argument is used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. +You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters, +or you can apply specific sorting and attribute configurations to tables when loading them into the database including determining where they sshould not be applieds by using a specific table name. +If no sorting or attributes are required pass in the dictionary with an empty list .e.g +```q +.loader.init[`tabname`att`column`sort!()] // Apply no sorting or attributes +.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute +.loader.init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes +``` +The dictionary arguments are outlined below. + +| Key | Values | Description | +|-----------|-----------------------|----------------------------------------------------------------------------------| +| `tabname` | symbol/ symbol list | Name of table | +| `att` | symbol/ symbol list | Attributes corresponding to the table names | +| `column` | symbol/ symbol list | Columns to sort and apply attributes to | +| `sort` | boolean /boolean list | Determines if the corresponding table will be sorted (1b: sorted; 0b:not sorted) | + + +--- + +### :rocket: Functions + +`.loader.loadallfiles` is the primary function used to load in all data and create the database. +The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. +The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, +persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes. + + +## :mag_right: Params in depth +The dictionary should/can have the following fields: + +| Parameter | Required | Type | Description | +|-------------------|----------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `headers` | Y | symbol | Names of the header columns in the file | +| `types` | Y | char list | Data types to read from the file | +| `separator` | Y | char list | Delimiting character. Enlist it if first line of file is header data | +| `tablename` | Y | symbol | Name of table to write data to | +| `dbdir` | Y | symbol | Directory to write data to | +| `symdir` | N | symbol | Directory to enumerate against | +| `partitiontype` | N | symbol | Partitioning to use. Must be one of `date, month, year, int`. Default is `date` | +| `partitioncol` | N | symbol | Column to use to extract partition information.Default is `time` | +| `dataprocessfunc` | N | function | Diadic function to process data after it has been read in. First argument is load parameters dictionary, second argument is data which has been read in. Default is `{[x;y] y}` | +| `chunksize` | N | int | Data size in bytes to read in one chunk. Default is `100 MB` | +| `compression` | N | int list | Compression parameters to use e.g. `17 2 6`. Default is empty list for no compression | +| `gc` | N | boolean | Whether to run garbage collection at appropriate points. Default is `0b` (false) | +| `filepattern` | N | char list | Pattern used to only load certain files e.g. `".csv"`,`("*.csv","*.txt")` | + +The second parameter is a directory handle .e.g +```q +`:dir +``` + +--- + +### :test_tube: Example + +```q +\l dataloader.q + +// Initialise the package +.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] + +// Check table exists +select from .loader.sortparams +tabname att column sort +----------------------- +default p sym 1 + +// Read in data and create db +.loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb); `:TRADE/toload] + +//load in db +\l hdb +//check table and sorting +select from trade + +date sym time price volume mktflag cond exclude +------------------------------------------------------------------------------- +2025.07.15 AAPL 2025.07.15D01:17:08.000000000 266 3980 B 10 1 +2025.07.15 AAPL 2025.07.15D01:44:42.000000000 278 31 B 12 1 +2025.07.15 AAPL 2025.07.15D02:05:37.000000000 34 8699 S 21 0 +2025.07.15 AAPL 2025.07.15D02:06:02.000000000 97 1769 B 29 1 +2025.07.15 AAPL 2025.07.15D02:14:24.000000000 106 8138 B 2 1 +2025.07.15 AAPL 2025.07.15D02:40:33.000000000 61 2611 B 36 1 +2025.07.15 AAPL 2025.07.15D03:29:37.000000000 31 4240 B 15 1 + +// Ensure attributes are applied +meta trade + +c | t f a +-------| ----- +date | d +sym | s p +time | p +price | f +volume | i +mktflag| c +cond | h +exclude| b +``` \ No newline at end of file diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q new file mode 100644 index 0000000..4493622 --- /dev/null +++ b/dataloader/dataloader.q @@ -0,0 +1,127 @@ +// ---- dataloader.q ---- A generic dataloader library +// generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles +// will read in a directory of input files and write them out to an HDB +// files are read in chunks using .Q.fsn +// main function to call is loadallfiles +// loadallfiles takes a directory of files to read, and a dictionary + +// headers = names of headers in the file e.g. `sym`time`price`size`condition +// types = data types e.g. "SPFIC" +// separator = separator field e.g. ",". +// tablename = name of table to load to, e.g. `trade +// dbdir = database directory to write to e.g. `:hdb +// symdir [optional] = directory to enumerate against; default is to enumerate against dbdir +// dataprocessfunc [optional] = diadic function to use to further process data before saving. +// Parameters passed in are loadparams dict and data to be modified. Default is {[x;y] y} +// partitiontype [optional] = the partition type - one of `date`month`year`int. Default is `date +// partitioncol [optional] = the name of the column to cast to the partition type to work out which partition the data should go in. default is `time +// chunksize [optional] = size of data chunks in bytes to read at a time. default is 100MB +// compression [optional] = compression parameters to use. list of 3 integers e.g. 17 2 6. +// filepattern [optional] = specify pattern used to filter files +// These are only set when the data is sorted on disk (in the finish function) to save on writing the data compressed, reading in and uncompressing, sorting, and writing out compressed again +// gc [optional] = boolean flag to turn garbage collection on and off. Default is 0b + +// e.g. +// .loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`tdc;`:hdb); `:TDC/toload] + + +// Functions loads data in from delimited file, applies processing function, enumerates it then writes to db +.loader.loaddata:{[loadparams;rawdata] + data:$[loadparams[`filename] in .loader.filesread; // Check if we have already read some data from this file. First row may contain the header information in both cases we want to return a table with the same column names + flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; // If it hasn't been read then we have to just read it as a list of lists + [.loader.filesread,::loadparams[`filename]; // It hasn't been seen - the first row may or may not be column headers + $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; + (loadparams[`types];enlist loadparams[`separator])0:rawdata; + flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata]]]; + data:0!loadparams[`dataprocessfunc] . (loadparams;data); // Do some optional extra processing + data:$[`symdir in key loadparams; // Enumerate the table - best to do this once /.lg.o[`dataloader;"Enumerating"]; + .Q.en[loadparams[`symdir];data]; + .Q.en[loadparams[`dbdir];data]]; + .loader.writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; + if[loadparams`gc; .Q.gc[]]; // Garbage collection + }; + +.loader.writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] + towrite:data where partition=partitiontype$data partitioncol; // Sub-select the data to write + writepath:` sv .Q.par[dbdir;partition;tablename],`; // Generate the write path + .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; // Splay the table - use an error trap + .loader.partitions[writepath]:(tablename;partition); // Make sure the written path is in the partition dictionary + }; + +// Adds compression, sorting and attributes selected +.loader.finish:{[loadparams] + .[set;(`.z.zd;loadparams`compression);{'"Failed to set .z.d: ",x}]; // Set .z.zd + {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; // Re-sort and set attributes on each partition + system"x .z.zd"; // Unset .z.zd + if[loadparams`gc; .Q.gc[]]; // Garbage collection + }; + +// Load all the files from a specified directory +.loader.loadallfiles:{[loadparams;dir] + .loader.partitions::()!(); // Reset the partitions and files read variables + .loader.filesread::(); + if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input + req:`headers`types`tablename`dbdir`separator; // Required fields + if[not all req in key loadparams; + '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; + if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; + + loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values + reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n + + if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types + '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; + if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; + if[not 99h Date: Mon, 21 Jul 2025 16:20:41 +0100 Subject: [PATCH 02/17] update dataloader.q --- dataloader/dataloader.q | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 4493622..7b7fb77 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -50,7 +50,7 @@ // Adds compression, sorting and attributes selected .loader.finish:{[loadparams] - .[set;(`.z.zd;loadparams`compression);{'"Failed to set .z.d: ",x}]; // Set .z.zd + if[count loadparams `compression;.z.zd:loadparams`compression]; // Set .z.zd {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; // Re-sort and set attributes on each partition system"x .z.zd"; // Unset .z.zd if[loadparams`gc; .Q.gc[]]; // Garbage collection From 79155e748745d041f4276d1b23557d3402ef1745 Mon Sep 17 00:00:00 2001 From: DHopkinson-DI Date: Wed, 23 Jul 2025 16:46:58 +0100 Subject: [PATCH 03/17] filter logic, remove block syntax, update init --- dataloader/dataloader.md | 10 ++-- dataloader/dataloader.q | 102 ++++++++++++++++----------------------- 2 files changed, 46 insertions(+), 66 deletions(-) diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md index f646094..c7d3ced 100644 --- a/dataloader/dataloader.md +++ b/dataloader/dataloader.md @@ -24,12 +24,12 @@ After loading the package into the session the unary function `.loader.init` is The sole argument to `.loader.init` should be a dictionary with the keys `tabname, att, column and sort`. The argument is used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters, -or you can apply specific sorting and attribute configurations to tables when loading them into the database including determining where they sshould not be applieds by using a specific table name. -If no sorting or attributes are required pass in the dictionary with an empty list .e.g +or you can apply specific sorting and attribute configurations to tables when loading them into the database including determining where they should not be applied by using a specific table name. +If no sorting or attributes are required pass in the dictionary with a `tabname` with `default`, `att` and `column` with backticks and `sort` with `0b`, examples shown below: ```q -.loader.init[`tabname`att`column`sort!()] // Apply no sorting or attributes -.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute -.loader.init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes +.loader.init[`tabname`att`column`sort!(`default;`;`;0b)] // Apply no sorting or attributes +.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute +.loader.init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes ``` The dictionary arguments are outlined below. diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 7b7fb77..1d72c18 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,42 +1,16 @@ // ---- dataloader.q ---- A generic dataloader library -// generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles -// will read in a directory of input files and write them out to an HDB -// files are read in chunks using .Q.fsn -// main function to call is loadallfiles -// loadallfiles takes a directory of files to read, and a dictionary - -// headers = names of headers in the file e.g. `sym`time`price`size`condition -// types = data types e.g. "SPFIC" -// separator = separator field e.g. ",". -// tablename = name of table to load to, e.g. `trade -// dbdir = database directory to write to e.g. `:hdb -// symdir [optional] = directory to enumerate against; default is to enumerate against dbdir -// dataprocessfunc [optional] = diadic function to use to further process data before saving. -// Parameters passed in are loadparams dict and data to be modified. Default is {[x;y] y} -// partitiontype [optional] = the partition type - one of `date`month`year`int. Default is `date -// partitioncol [optional] = the name of the column to cast to the partition type to work out which partition the data should go in. default is `time -// chunksize [optional] = size of data chunks in bytes to read at a time. default is 100MB -// compression [optional] = compression parameters to use. list of 3 integers e.g. 17 2 6. -// filepattern [optional] = specify pattern used to filter files -// These are only set when the data is sorted on disk (in the finish function) to save on writing the data compressed, reading in and uncompressing, sorting, and writing out compressed again -// gc [optional] = boolean flag to turn garbage collection on and off. Default is 0b - -// e.g. -// .loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`tdc;`:hdb); `:TDC/toload] - // Functions loads data in from delimited file, applies processing function, enumerates it then writes to db .loader.loaddata:{[loadparams;rawdata] data:$[loadparams[`filename] in .loader.filesread; // Check if we have already read some data from this file. First row may contain the header information in both cases we want to return a table with the same column names flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; // If it hasn't been read then we have to just read it as a list of lists - [.loader.filesread,::loadparams[`filename]; // It hasn't been seen - the first row may or may not be column headers - $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; + $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; // It hasn't been seen - the first row may or may not be column headers (loadparams[`types];enlist loadparams[`separator])0:rawdata; - flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata]]]; + flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] + ]; + if[not loadparams[`filename] in .loader.filesread;.loader.filesread,::loadparams[`filename]]; data:0!loadparams[`dataprocessfunc] . (loadparams;data); // Do some optional extra processing - data:$[`symdir in key loadparams; // Enumerate the table - best to do this once /.lg.o[`dataloader;"Enumerating"]; - .Q.en[loadparams[`symdir];data]; - .Q.en[loadparams[`dbdir];data]]; + data:.Q.en[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data]; // Enumerate the table - best to do this once .loader.writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; if[loadparams`gc; .Q.gc[]]; // Garbage collection }; @@ -50,35 +24,16 @@ // Adds compression, sorting and attributes selected .loader.finish:{[loadparams] - if[count loadparams `compression;.z.zd:loadparams`compression]; // Set .z.zd + if[count loadparams `compression;.z.zd:loadparams`compression]; // Set .z.zd {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; // Re-sort and set attributes on each partition system"x .z.zd"; // Unset .z.zd if[loadparams`gc; .Q.gc[]]; // Garbage collection }; // Load all the files from a specified directory -.loader.loadallfiles:{[loadparams;dir] +.loader.loadallfiles:{[loadparams:.loader.util.paramfilter;dir] .loader.partitions::()!(); // Reset the partitions and files read variables .loader.filesread::(); - if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input - req:`headers`types`tablename`dbdir`separator; // Required fields - if[not all req in key loadparams; - '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; - if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; - - loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values - reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n - - if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types - '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; - if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; - if[not 99hsum exec sort from .loader.sortparams;:()]; sp:$[count tabparams:select from .loader.sortparams where tabname=d[0]; tabparams; count defaultparams:select from .loader.sortparams where tabname=`default; @@ -114,14 +69,39 @@ }[sp] each distinct (),last d; }; +// Function checks keys are correct and value have the right types for loader.loadallfiles argument +.loader.util.paramfilter:{[loadparams] + if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input + req:`headers`types`tablename`dbdir`separator; // Required fields + if[not all req in key loadparams; + '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; + if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; + loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values + reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n + if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types + '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; + if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; + if[not 99h Date: Mon, 18 Aug 2025 23:30:10 -0400 Subject: [PATCH 04/17] Update dataloader PR to align with style and comments. Add testing --- dataloader/dataloader.md | 9 +- dataloader/dataloader.q | 177 +++++++++++++++++++-------------------- dataloader/test.csv | 46 ++++++++++ dataloader/test.q | 50 +++++++++++ 4 files changed, 189 insertions(+), 93 deletions(-) create mode 100644 dataloader/test.csv create mode 100644 dataloader/test.q diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md index c7d3ced..3f287fa 100644 --- a/dataloader/dataloader.md +++ b/dataloader/dataloader.md @@ -2,7 +2,7 @@ This package is used for automated customisable dataloading and database creation and is a generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles. -Load all delimeted files in a directory into memory in configurable chunk sizes then output the resulting tables to disk in kdb+ partiioned format. +Load all delimeted files in a directory into memory in configurable chunk sizes then output the resulting tables to disk in kdb+ partitioned format. When all the data is written, the on-disk data is re-sorted and the attributes are applied. --- @@ -16,7 +16,7 @@ When all the data is written, the on-disk data is re-sorted and the attributes a --- -## :gear: Iniitialisation +## :gear: Initialisation After loading the package into the session the unary function `.loader.init` is called with a single argument to initialise the packages global variables and define any configurable sorting/attributes. @@ -62,12 +62,13 @@ The dictionary should/can have the following fields: | `tablename` | Y | symbol | Name of table to write data to | | `dbdir` | Y | symbol | Directory to write data to | | `symdir` | N | symbol | Directory to enumerate against | +| `enumname` | N | symbol | Name of symfile to enumerate against. Default is `sym | | `partitiontype` | N | symbol | Partitioning to use. Must be one of `date, month, year, int`. Default is `date` | | `partitioncol` | N | symbol | Column to use to extract partition information.Default is `time` | | `dataprocessfunc` | N | function | Diadic function to process data after it has been read in. First argument is load parameters dictionary, second argument is data which has been read in. Default is `{[x;y] y}` | -| `chunksize` | N | int | Data size in bytes to read in one chunk. Default is `100 MB` | +| `chunksize` | N | int | Data size in bytes to read in one chunk. Default is `100 MB` | | `compression` | N | int list | Compression parameters to use e.g. `17 2 6`. Default is empty list for no compression | -| `gc` | N | boolean | Whether to run garbage collection at appropriate points. Default is `0b` (false) | +| `gc` | N | boolean | Whether to run garbage collection at appropriate points. Default is `0b` (false) | | `filepattern` | N | char list | Pattern used to only load certain files e.g. `".csv"`,`("*.csv","*.txt")` | The second parameter is a directory handle .e.g diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 1d72c18..22fb1fe 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,107 +1,106 @@ -// ---- dataloader.q ---- A generic dataloader library +/ generic dataloader library -// Functions loads data in from delimited file, applies processing function, enumerates it then writes to db .loader.loaddata:{[loadparams;rawdata] - data:$[loadparams[`filename] in .loader.filesread; // Check if we have already read some data from this file. First row may contain the header information in both cases we want to return a table with the same column names - flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; // If it hasn't been read then we have to just read it as a list of lists - $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; // It hasn't been seen - the first row may or may not be column headers - (loadparams[`types];enlist loadparams[`separator])0:rawdata; - flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] - ]; - if[not loadparams[`filename] in .loader.filesread;.loader.filesread,::loadparams[`filename]]; - data:0!loadparams[`dataprocessfunc] . (loadparams;data); // Do some optional extra processing - data:.Q.en[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data]; // Enumerate the table - best to do this once - .loader.writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; - if[loadparams`gc; .Q.gc[]]; // Garbage collection - }; + / loads data in from delimeted file, applies processing function, enumerates and writes to db + data:$[loadparams[`filename] in .loader.filesread; / check if some data has already been read in + flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; + $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; / it hasn't been seen, may be column headers + (loadparams[`types];loadparams[`separator])0:rawdata; + flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] + ]; + if[not loadparams[`filename] in .loader.filesread;.loader.filesread,::loadparams[`filename]]; + data:0!loadparams[`dataprocessfunc] . (loadparams;data); + data:.Q.ens[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data;loadparams[`enumname]]; + .loader.writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; + if[loadparams`gc; .Q.gc[]]; + }; .loader.writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] - towrite:data where partition=partitiontype$data partitioncol; // Sub-select the data to write - writepath:` sv .Q.par[dbdir;partition;tablename],`; // Generate the write path - .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; // Splay the table - use an error trap - .loader.partitions[writepath]:(tablename;partition); // Make sure the written path is in the partition dictionary - }; + / write data for provdided database and partition + towrite:data where partition=partitiontype$data partitioncol; + writepath:` sv .Q.par[dbdir;partition;tablename],`; + .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; + .loader.partitions[writepath]:(tablename;partition); + }; -// Adds compression, sorting and attributes selected .loader.finish:{[loadparams] - if[count loadparams `compression;.z.zd:loadparams`compression]; // Set .z.zd - {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; // Re-sort and set attributes on each partition - system"x .z.zd"; // Unset .z.zd - if[loadparams`gc; .Q.gc[]]; // Garbage collection - }; + / adds compression, sorting and attributes selected + if[count loadparams `compression;.z.zd:loadparams`compression]; / temporarily set compression defaults + {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; + system"x .z.zd"; + if[loadparams`gc; .Q.gc[]]; + }; -// Load all the files from a specified directory .loader.loadallfiles:{[loadparams:.loader.util.paramfilter;dir] - .loader.partitions::()!(); // Reset the partitions and files read variables - .loader.filesread::(); - filelist:$[`filepattern in key loadparams; - (key dir:hsym dir) where max like[key dir;] each loadparams[`filepattern]; - key dir:hsym dir]; // Get the contents of the directory based on optional filepattern - filelist:` sv' dir,'filelist; // Create the full path - {[loadparams;file] - .Q.fsn[.loader.loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize] - }[loadparams] each filelist; - .loader.finish[loadparams]; // Finish the load - }; + / load all the files from a specified directory + .loader.partitions::()!(); + .loader.filesread::(); + filelist:$[`filepattern in key loadparams; + (key dir:hsym dir) where max like[key dir;] each loadparams[`filepattern]; + key dir:hsym dir]; / get the contents of the directory based on optional filepattern + filelist:` sv' dir,'filelist; + {[loadparams;file] .Q.fsn[.loader.loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams] each filelist; + .loader.finish[loadparams]; + }; -// Utility Functions .loader.util.applyattr:{[dloc;colname;att] - .[{@[x;y;z#]};(dloc;colname;att); // Attempt to apply the attribute to the column and log an error if it fails - {[dloc;colname;att;e] - '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; - }[dloc;colname;att] - ] - }; + / utility function + .[{@[x;y;z#]};(dloc;colname;att); + {[dloc;colname;att;e] + '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; + }[dloc;colname;att] + ] + }; -// Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. .loader.util.sorttab:{[d] - if[1>sum exec sort from .loader.sortparams;:()]; - sp:$[count tabparams:select from .loader.sortparams where tabname=d[0]; - tabparams; - count defaultparams:select from .loader.sortparams where tabname=`default; - defaultparams - ]; - {[sp;dloc] // Loop through each directory and sort the data - if[count sortcols: exec column from sp where sort, not null column; - .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; - if[count attrcols: select column, att from sp where not null att; - .loader.util.applyattr[dloc;;]'[attrcols`column;attrcols`att]]; // Apply attribute(s) - }[sp] each distinct (),last d; - }; + / function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. + if[1>sum exec sort from .loader.sortparams;:()]; + sp:$[count tabparams:select from .loader.sortparams where tabname=d[0]; + tabparams; + count defaultparams:select from .loader.sortparams where tabname=`default; + defaultparams + ]; + {[sp;dloc] / loop through each directory and sort the data + if[count sortcols: exec column from sp where sort, not null column; + .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; + if[count attrcols: select column, att from sp where not null att; + .loader.util.applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / apply attributes + }[sp] each distinct (),last d; + }; -// Function checks keys are correct and value have the right types for loader.loadallfiles argument .loader.util.paramfilter:{[loadparams] - if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input - req:`headers`types`tablename`dbdir`separator; // Required fields - if[not all req in key loadparams; - '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; - if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; - loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values - reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n - if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types - '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; - if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; - if[not 99h Date: Tue, 19 Aug 2025 19:11:47 -0400 Subject: [PATCH 05/17] Add additional test cases, update doc --- dataloader/dataloader.md | 12 +++++++++++- dataloader/test.csv | 34 ++++++++++++++++++++++++++++++++-- dataloader/test.q | 7 +++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md index 3f287fa..2800e64 100644 --- a/dataloader/dataloader.md +++ b/dataloader/dataloader.md @@ -2,7 +2,17 @@ This package is used for automated customisable dataloading and database creation and is a generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles. -Load all delimeted files in a directory into memory in configurable chunk sizes then output the resulting tables to disk in kdb+ partitioned format. +Package employs a chunk-based loading strategy to minimize memory usage when processing large datasets. Rather than loading entire datasets into memory before writing to disk, data is processed incrementally in manageable chunks. + +The memory footprint is determined by the maximum of: +- Memory required to load and save one data chunk +- Memory required to sort the final resultant table + +This approach enables processing of large data volumes with a relatively small memory footprint. Example use cases include: +- Large File Processing: Loading very large files by processing them in small, sequential chunks +- Cross-Partition Loading: Efficiently handling distributed data across multiple small files (e.g., separate monthly files for AAPL and MSFT data) + +The chunked architecture ensures scalable performance regardless of total dataset size, making it suitable for processing datasets that would otherwise exceed available system memory. When all the data is written, the on-disk data is re-sorted and the attributes are applied. --- diff --git a/dataloader/test.csv b/dataloader/test.csv index d7bd1e1..8ecde05 100644 --- a/dataloader/test.csv +++ b/dataloader/test.csv @@ -1,7 +1,7 @@ true,0,0,q,(".loader.init requires a dictionary parameter")~@[.loader.util.sortfilter;`;{x}],1,,Test error trapping for wrong param type true,0,0,q,("Error ensure dictionary values are the correct types 11 11 11 1h or -11 -11 -11 -1h")~@[.loader.util.sortfilter;(`tabname`att`column`sort)!(1;2;3;4);{x}],1,,Test error trapping for param value types true,0,0,q,("Error ensure argument is a dictionary with keys `tabname`att`column`sort")~@[.loader.util.sortfilter;(`test1`test2`test3`test4)!(`test1;`test2;`test3;1b);{x}],1,,Test error trapping for from param key values -run,0,0,q,.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)],1,,Init package with defaults for all tables +run,0,0,q,.loader.init[`tabname`att`column`sort!(`default`quote`tradedaily;`p`s`;`sym`time`;110b)],1,,Init package with defaults for all tables run,0,0,q,.test.dataloader.mockdirs[0b;`trade],1,,Create temporary test directory for checking dataloading with trade no headers run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`time`sym`price`size`side`exchange;"PSFISS";.test.dataloader.delimeter;`trade;`:test/data/hdb);`:test/data/files],1,,Test load of trade file with no headers @@ -35,12 +35,42 @@ run,0,0,q,.test.dataloader.mockdirs[1b;`tradedaily],1,,Create temporary test dir run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitiontype`partitioncol!(`date`time`sym`price`size`exchange;"DTSFIS";enlist .test.dataloader.delimeter;`tradedaily;`:test/data/hdb;`month;`date);`:test/data/files],1,,Test custom partition overwrites run,0,0,q,\l test/data/hdb,1,,Load database true,0,0,q,all (`2024.01`2024.02`2024.03`2024.04) in\: key `:.,1,,Check partitions were created as month and all are there +true,0,0,q,all null exec a from 0!meta tradedaily,1,,Check no attributes applied to tradedaily table + +run,0,0,q,.test.dataloader.mockdirs[0b;`quote],1,,Create temporary test directory for checking dataloading with quote and diadic function application +run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";.test.dataloader.delimeter;`quote;`:test/data/hdb;.test.dataloader.dataprocessfunc);`:test/data/files],1,,Test load of quote file +run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`quote in tables[],1,,Check that table loaded to db +true,0,0,q,`mid in cols quote,1,,Check that column from update function is included in save down + +run,0,0,q,.test.dataloader.mockdirs[0b;`quote],1,,Create temporary test directory for checking dataloading with quote reading in with chunks +run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`chunksize!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";.test.dataloader.delimeter;`quote;`:test/data/hdb;200i);`:test/data/files],1,,Test load of quote file +run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`quote in tables[],1,,Test quote table loaded to db in chunks +true,0,0,q,10=count quote,1,,Test all of quote file loaded in chunks + +run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking compression loading +run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`compression!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;(17i;2i;6i));`:test/data/files],1,,Test loading with compression specified +run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`trade in tables[],1,,Test trade table loaded to db +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/exchange)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for exchange column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/price)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for price column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/side)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for side column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/size)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for size column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/sym)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for sym column +true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/time)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for time column + +run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking garbage collection loading +run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`gc!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;1b);`:test/data/files],1,,Test loading with garbage collection enabled +run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`trade in tables[],1,,Test trade table loaded to db run,0,0,q,.test.dataloader.mockdirs[1b;`trade`quote],1,,Create temporary test directory for checking dataloading with trade and quote at the same time specifying with filepattern run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;"trade.csv");`:test/data/files],1,,Test load of trade file run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";enlist .test.dataloader.delimeter;`quote;`:test/data/hdb;"quote.csv");`:test/data/files],1,,Test load of quote file run,0,0,q,\l test/data/hdb,1,,Load database +true,0,0,q,`s=(meta quote)[`time][`a],1,,Check quote table sorted on time true,0,0,q,`trade in tables[],1,,Test trade table loaded to db true,0,0,q,`quote in tables[],1,,Test quote table loaded to db -run,0,0,q,.test.dataloader.complete[],1,,Remove testing environment +run,0,0,q,.test.dataloader.complete[],1,,Remove testing evidence diff --git a/dataloader/test.q b/dataloader/test.q index f7e42cb..64039cf 100644 --- a/dataloader/test.q +++ b/dataloader/test.q @@ -47,4 +47,11 @@ .test.dataloader.mocksymdir:{system "mkdir test/data/symdir"}; +.test.dataloader.dataprocessfunc:{[loaderparams;data] + / testing function will calculate mid column from quote data + update mid:avg(bid;ask) from data + }; + .test.dataloader.delimeter:","; + +.test.dataloader.complete:{system"cd ../../..";system"rm -rf test/data"}; From efec38d2b50af2960dd29db1324e688812536ea1 Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Thu, 25 Sep 2025 15:06:52 -0400 Subject: [PATCH 06/17] Remove namespacing To account for the new import method, have removed the top level (.loader) namespacing. Also eliminated the second level .util namespacing as seemed superfluous. Unclear around how setting globals within a namespace from within a function will be impacted by new import changes so have changed how globals are set within the init function. --- dataloader/dataloader.q | 56 ++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 22fb1fe..66d70f4 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,49 +1,49 @@ / generic dataloader library -.loader.loaddata:{[loadparams;rawdata] +loaddata:{[loadparams;rawdata] / loads data in from delimeted file, applies processing function, enumerates and writes to db - data:$[loadparams[`filename] in .loader.filesread; / check if some data has already been read in + data:$[loadparams[`filename] in filesread; / check if some data has already been read in flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; / it hasn't been seen, may be column headers (loadparams[`types];loadparams[`separator])0:rawdata; flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] ]; - if[not loadparams[`filename] in .loader.filesread;.loader.filesread,::loadparams[`filename]]; + if[not loadparams[`filename] in filesread;filesread,::loadparams[`filename]]; data:0!loadparams[`dataprocessfunc] . (loadparams;data); data:.Q.ens[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data;loadparams[`enumname]]; - .loader.writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; + writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; if[loadparams`gc; .Q.gc[]]; }; -.loader.writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] +writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] / write data for provdided database and partition towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; - .loader.partitions[writepath]:(tablename;partition); + partitions[writepath]:(tablename;partition); }; -.loader.finish:{[loadparams] +finish:{[loadparams] / adds compression, sorting and attributes selected if[count loadparams `compression;.z.zd:loadparams`compression]; / temporarily set compression defaults - {.loader.util.sorttab (x;where .loader.partitions[;0]=x)} each distinct value .loader.partitions[;0]; + {sorttab (x;where partitions[;0]=x)} each distinct value partitions[;0]; system"x .z.zd"; if[loadparams`gc; .Q.gc[]]; }; -.loader.loadallfiles:{[loadparams:.loader.util.paramfilter;dir] +loadallfiles:{[loadparams:paramfilter;dir] / load all the files from a specified directory - .loader.partitions::()!(); - .loader.filesread::(); + partitions::()!(); + filesread::(); filelist:$[`filepattern in key loadparams; (key dir:hsym dir) where max like[key dir;] each loadparams[`filepattern]; key dir:hsym dir]; / get the contents of the directory based on optional filepattern filelist:` sv' dir,'filelist; - {[loadparams;file] .Q.fsn[.loader.loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams] each filelist; - .loader.finish[loadparams]; + {[loadparams;file] .Q.fsn[loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams] each filelist; + finish[loadparams]; }; -.loader.util.applyattr:{[dloc;colname;att] +applyattr:{[dloc;colname;att] / utility function .[{@[x;y;z#]};(dloc;colname;att); {[dloc;colname;att;e] @@ -52,25 +52,25 @@ ] }; -.loader.util.sorttab:{[d] +sorttab:{[d] / function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. - if[1>sum exec sort from .loader.sortparams;:()]; - sp:$[count tabparams:select from .loader.sortparams where tabname=d[0]; + if[1>sum exec sort from sortparams;:()]; + sp:$[count tabparams:select from sortparams where tabname=d[0]; tabparams; - count defaultparams:select from .loader.sortparams where tabname=`default; + count defaultparams:select from sortparams where tabname=`default; defaultparams ]; {[sp;dloc] / loop through each directory and sort the data if[count sortcols: exec column from sp where sort, not null column; .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; if[count attrcols: select column, att from sp where not null att; - .loader.util.applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / apply attributes + applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / apply attributes }[sp] each distinct (),last d; }; -.loader.util.paramfilter:{[loadparams] - / function checks keys are correct and value have the right types for .loader.loadallfiles argument - if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; +paramfilter:{[loadparams] + / function checks keys are correct and value have the right types for loadallfiles argument + if[not 99h=type loadparams; '"loadallfiles requires a dictionary parameter"]; req:`headers`types`tablename`dbdir`separator; if[not all req in key loadparams; '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; @@ -90,17 +90,17 @@ loadparams }; -.loader.util.sortfilter:{[sortparams] +sortfilter:{[sortparams] / function checks dictionary argument for init function has correct headers and types - if[not 99h=type sortparams; '".loader.init requires a dictionary parameter"]; + if[not 99h=type sortparams; '"init requires a dictionary parameter"]; if[not (abs type each value sortparams)~11 11 11 1h;'"Error ensure dictionary values are the correct types 11 11 11 1h or -11 -11 -11 -1h"]; if[not `tabname`att`column`sort~key sortparams; '"Error ensure argument is a dictionary with keys `tabname`att`column`sort"]; flip (),/: sortparams }; -.loader.init:{[sortparams:.loader.util.sortfilter] +init:{[sp:sortfilter] / package init function - .loader.partitions:()!(); / maintain a dictionary of the db partitions written to by loader - .loader.filesread:(); / maintain a list of files which have been read - .loader.sortparams:sortparams; + `partitions set ()!(); / maintain a dictionary of the db partitions written to by loader + `filesread set (); / maintain a list of files which have been read + `sortparams set sortparams; }; From 8dec401538faf160747a72858a5dcedb13d46175 Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Thu, 25 Sep 2025 15:14:37 -0400 Subject: [PATCH 07/17] Remove namespacing Account for removing of namespacing in q script; may need to change this again once mechanism for package importing becomes clearer --- dataloader/dataloader.md | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md index 2800e64..b2d2029 100644 --- a/dataloader/dataloader.md +++ b/dataloader/dataloader.md @@ -28,18 +28,18 @@ When all the data is written, the on-disk data is re-sorted and the attributes a ## :gear: Initialisation -After loading the package into the session the unary function `.loader.init` is called with a single argument to initialise the packages global variables and define any configurable sorting/attributes. +After loading the package into the session the unary function `init` is called with a single argument to initialise the packages global variables and define any configurable sorting/attributes. ### :mag_right: Params in Depth -The sole argument to `.loader.init` should be a dictionary with the keys `tabname, att, column and sort`. The argument is used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. +The sole argument to `init` should be a dictionary with the keys `tabname, att, column and sort`. The argument is used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters, or you can apply specific sorting and attribute configurations to tables when loading them into the database including determining where they should not be applied by using a specific table name. If no sorting or attributes are required pass in the dictionary with a `tabname` with `default`, `att` and `column` with backticks and `sort` with `0b`, examples shown below: ```q -.loader.init[`tabname`att`column`sort!(`default;`;`;0b)] // Apply no sorting or attributes -.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute -.loader.init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes +init[`tabname`att`column`sort!(`default;`;`;0b)] // Apply no sorting or attributes +init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute +init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes ``` The dictionary arguments are outlined below. @@ -55,7 +55,7 @@ The dictionary arguments are outlined below. ### :rocket: Functions -`.loader.loadallfiles` is the primary function used to load in all data and create the database. +`loadallfiles` is the primary function used to load in all data and create the database. The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes. @@ -94,16 +94,16 @@ The second parameter is a directory handle .e.g \l dataloader.q // Initialise the package -.loader.init[`tabname`att`column`sort!(`default;`p;`sym;1b)] +init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Check table exists -select from .loader.sortparams +select from sortparams tabname att column sort ----------------------- default p sym 1 // Read in data and create db -.loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb); `:TRADE/toload] +loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb); `:TRADE/toload] //load in db \l hdb @@ -133,4 +133,5 @@ volume | i mktflag| c cond | h exclude| b -``` \ No newline at end of file + +``` From 4fa96b774429eb188c1f93861b72b1a3fcb3a59f Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Mon, 29 Sep 2025 17:04:07 -0400 Subject: [PATCH 08/17] Create init file for module File which creates private namespace of functionality and exposes public interface --- dataloader/init.q | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 dataloader/init.q diff --git a/dataloader/init.q b/dataloader/init.q new file mode 100644 index 0000000..68be29a --- /dev/null +++ b/dataloader/init.q @@ -0,0 +1,8 @@ +//Load core functionality into root module namespace +\l dataloader.q +//Create secondary namespace level and load relevant script into it +\d .z.m.util +\l ::util.q +//Return to root module namespace to simplify exposure of public functions +\d .z.m +export:([init:init;loadallfiles:loadallfiles]) From 92cb1ecb2698b6dd66baca4f636eb8b1042814fe Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Mon, 29 Sep 2025 17:04:59 -0400 Subject: [PATCH 09/17] Reference private namespace for globals --- dataloader/dataloader.q | 71 +++++------------------------------------ 1 file changed, 8 insertions(+), 63 deletions(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 66d70f4..bcf7f6c 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,5 +1,3 @@ -/ generic dataloader library - loaddata:{[loadparams;rawdata] / loads data in from delimeted file, applies processing function, enumerates and writes to db data:$[loadparams[`filename] in filesread; / check if some data has already been read in @@ -20,20 +18,20 @@ writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; - partitions[writepath]:(tablename;partition); + .z.m.partitions[writepath]:(tablename;partition); }; finish:{[loadparams] / adds compression, sorting and attributes selected if[count loadparams `compression;.z.zd:loadparams`compression]; / temporarily set compression defaults - {sorttab (x;where partitions[;0]=x)} each distinct value partitions[;0]; + {sorttab (x;where .z.m.partitions[;0]=x)} each distinct value .z.m.partitions[;0]; system"x .z.zd"; if[loadparams`gc; .Q.gc[]]; }; loadallfiles:{[loadparams:paramfilter;dir] / load all the files from a specified directory - partitions::()!(); + .z.m.partitions::()!(); filesread::(); filelist:$[`filepattern in key loadparams; (key dir:hsym dir) where max like[key dir;] each loadparams[`filepattern]; @@ -43,64 +41,11 @@ loadallfiles:{[loadparams:paramfilter;dir] finish[loadparams]; }; -applyattr:{[dloc;colname;att] - / utility function - .[{@[x;y;z#]};(dloc;colname;att); - {[dloc;colname;att;e] - '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; - }[dloc;colname;att] - ] - }; - -sorttab:{[d] - / function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. - if[1>sum exec sort from sortparams;:()]; - sp:$[count tabparams:select from sortparams where tabname=d[0]; - tabparams; - count defaultparams:select from sortparams where tabname=`default; - defaultparams - ]; - {[sp;dloc] / loop through each directory and sort the data - if[count sortcols: exec column from sp where sort, not null column; - .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; - if[count attrcols: select column, att from sp where not null att; - applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / apply attributes - }[sp] each distinct (),last d; - }; - -paramfilter:{[loadparams] - / function checks keys are correct and value have the right types for loadallfiles argument - if[not 99h=type loadparams; '"loadallfiles requires a dictionary parameter"]; - req:`headers`types`tablename`dbdir`separator; - if[not all req in key loadparams; - '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; - if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; - loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; / join loadparams with some default values - reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); - if[count w:where not (type each loadparams key reqtypes)=reqtypes; - '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; - if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; - if[not 99h Date: Mon, 29 Sep 2025 17:05:34 -0400 Subject: [PATCH 10/17] Add separate util script for module --- dataloader/util.q | 52 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 dataloader/util.q diff --git a/dataloader/util.q b/dataloader/util.q new file mode 100644 index 0000000..aea7f24 --- /dev/null +++ b/dataloader/util.q @@ -0,0 +1,52 @@ +applyattr:{[dloc;colname;att] + .[{@[x;y;z#]};(dloc;colname;att); // Attempt to apply the attribute to the column and log an error if it fails + {[dloc;colname;att;e] + '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; + }[dloc;colname;att] + ] + }; + +// Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. +sorttab:{[d] + if[1>sum exec sort from .z.m.sortparams;:()]; + sp:$[count tabparams:select from .z.m.sortparams where tabname=d[0]; + tabparams; + count defaultparams:select from .z.m.sortparams where tabname=`default; + defaultparams + ]; + {[sp;dloc] // Loop through each directory and sort the data + if[count sortcols: exec column from sp where sort, not null column; + .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; + if[count attrcols: select column, att from sp where not null att; + applyattr[dloc;;]'[attrcols`column;attrcols`att]]; // Apply attribute(s) + }[sp] each distinct (),last d; + }; + +// Function checks keys are correct and value have the right types for loader.loadallfiles argument +paramfilter:{[loadparams] + if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input + req:`headers`types`tablename`dbdir`separator; // Required fields + if[not all req in key loadparams; + '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; + if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; + loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values + reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n + if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types + '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; + if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; + if[not 99h Date: Fri, 24 Oct 2025 03:35:14 -0400 Subject: [PATCH 11/17] Update dataloader.q --- dataloader/dataloader.q | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index bcf7f6c..d121fb3 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,3 +1,5 @@ +/ generic dataloader library + loaddata:{[loadparams;rawdata] / loads data in from delimeted file, applies processing function, enumerates and writes to db data:$[loadparams[`filename] in filesread; / check if some data has already been read in @@ -7,6 +9,7 @@ loaddata:{[loadparams;rawdata] flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] ]; if[not loadparams[`filename] in filesread;filesread,::loadparams[`filename]]; + .c.t:(loadparams;filesread); data:0!loadparams[`dataprocessfunc] . (loadparams;data); data:.Q.ens[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data;loadparams[`enumname]]; writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; @@ -24,12 +27,12 @@ writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] finish:{[loadparams] / adds compression, sorting and attributes selected if[count loadparams `compression;.z.zd:loadparams`compression]; / temporarily set compression defaults - {sorttab (x;where .z.m.partitions[;0]=x)} each distinct value .z.m.partitions[;0]; + {.m.dataloader.util.sorttab (x;where .z.m.partitions[;0]=x)} each distinct value .z.m.partitions[;0]; system"x .z.zd"; if[loadparams`gc; .Q.gc[]]; }; -loadallfiles:{[loadparams:paramfilter;dir] +loadallfiles:{[loadparams:.m.dataloader.util.paramfilter;dir] / load all the files from a specified directory .z.m.partitions::()!(); filesread::(); @@ -38,14 +41,12 @@ loadallfiles:{[loadparams:paramfilter;dir] key dir:hsym dir]; / get the contents of the directory based on optional filepattern filelist:` sv' dir,'filelist; {[loadparams;file] .Q.fsn[loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams] each filelist; - finish[loadparams]; + .z.m.finish[loadparams]; }; -init:{[sp:sortfilter] +init:{[sortparams:.m.dataloader.util.sortfilter] / package init function .z.m.partitions:()!(); / maintain a dictionary of the db partitions written to by loader .z.m.filesread:(); / maintain a list of files which have been read .z.m.sortparams:sortparams; }; - - From 5dfc67fd46048c62538ab1760e606565cabbda0c Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Fri, 24 Oct 2025 03:35:56 -0400 Subject: [PATCH 12/17] Update init.q --- dataloader/init.q | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dataloader/init.q b/dataloader/init.q index 68be29a..a32718b 100644 --- a/dataloader/init.q +++ b/dataloader/init.q @@ -1,8 +1,6 @@ //Load core functionality into root module namespace -\l dataloader.q +\l ::dataloader.q //Create secondary namespace level and load relevant script into it -\d .z.m.util -\l ::util.q +util:use`dataloader.util //Return to root module namespace to simplify exposure of public functions -\d .z.m export:([init:init;loadallfiles:loadallfiles]) From fd625a7d96f6c5d19dab4b858d4eba7ce9e920e7 Mon Sep 17 00:00:00 2001 From: cstirling-dataintellect Date: Fri, 24 Oct 2025 03:36:37 -0400 Subject: [PATCH 13/17] Update util.q --- dataloader/util.q | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/dataloader/util.q b/dataloader/util.q index aea7f24..623c83d 100644 --- a/dataloader/util.q +++ b/dataloader/util.q @@ -8,10 +8,10 @@ applyattr:{[dloc;colname;att] // Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. sorttab:{[d] - if[1>sum exec sort from .z.m.sortparams;:()]; - sp:$[count tabparams:select from .z.m.sortparams where tabname=d[0]; + if[1>sum exec sort from .m.dataloader.sortparams;:()]; + sp:$[count tabparams:select from .m.dataloader.sortparams where tabname=d[0]; tabparams; - count defaultparams:select from .z.m.sortparams where tabname=`default; + count defaultparams:select from .m.dataloader.sortparams where tabname=`default; defaultparams ]; {[sp;dloc] // Loop through each directory and sort the data @@ -22,9 +22,9 @@ sorttab:{[d] }[sp] each distinct (),last d; }; -// Function checks keys are correct and value have the right types for loader.loadallfiles argument +// Function checks keys are correct and value have the right types for loadallfiles argument paramfilter:{[loadparams] - if[not 99h=type loadparams; '".loader.loadallfiles requires a dictionary parameter"]; // Check the input + if[not 99h=type loadparams; '"loadallfiles requires a dictionary parameter"]; // Check the input req:`headers`types`tablename`dbdir`separator; // Required fields if[not all req in key loadparams; '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; @@ -45,8 +45,10 @@ paramfilter:{[loadparams] // Function checks dictionary argument for init function has correct headers and types sortfilter:{[sortparams] - if[not 99h=type sortparams; '".loader.init requires a dictionary parameter"]; - if[not (abs type each value sortparams)~11 11 11 1h;'"Error, ensure dictionary values are the correct types 11 11 11 1h or -11 -11 -11 -1h"]; + if[not 99h=type sortparams; '"init requires a dictionary parameter"]; + if[not (abs type each value sortparams)~11 11 11 1h;'"Error ensure dictionary values are the correct types 11 11 11 1h or -11 -11 -11 -1h"]; if[not `tabname`att`column`sort~key sortparams; '"Error ensure argument is a dictionary with keys `tabname`att`column`sort"]; flip (),/: sortparams }; + +export:.z.m; From 8219c26680e783c9538a76d5199aae9cbd7b1349 Mon Sep 17 00:00:00 2001 From: Eliot Robinson Date: Fri, 24 Oct 2025 17:23:56 +0100 Subject: [PATCH 14/17] overhauled the dataloader package tests and tidy up/refactor a lot of code --- dataloader/dataloader.q | 52 ++++++++++---------- dataloader/init.q | 6 +-- dataloader/test.csv | 103 ++++++++++++++++++++++------------------ dataloader/test.q | 57 ---------------------- dataloader/util.q | 76 +++++++++++++++-------------- 5 files changed, 124 insertions(+), 170 deletions(-) delete mode 100644 dataloader/test.q diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index d121fb3..e870b1a 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,22 +1,22 @@ / generic dataloader library loaddata:{[loadparams;rawdata] - / loads data in from delimeted file, applies processing function, enumerates and writes to db - data:$[loadparams[`filename] in filesread; / check if some data has already been read in - flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata; - $[all (`$"," vs rawdata[0;]) in loadparams[`headers]; / it hasn't been seen, may be column headers - (loadparams[`types];loadparams[`separator])0:rawdata; - flip loadparams[`headers]!(loadparams[`types];loadparams[`separator])0:rawdata] + / loads data in from delimited file, applies processing function, enumerates and writes to db + / NOTE: it is not trivial to check user has inputted headers correctly, assume they have + data:$[(`$"," vs rawdata 0)~loadparams`headers; / check if first row matches headers provided + (loadparams`types`separator)0:rawdata; / if so, read in the files normally + flip loadparams[`headers]!(loadparams`types`separator)0:rawdata / if not, add the headers manually ]; - if[not loadparams[`filename] in filesread;filesread,::loadparams[`filename]]; - .c.t:(loadparams;filesread); - data:0!loadparams[`dataprocessfunc] . (loadparams;data); - data:.Q.ens[loadparams $[`symdir in key loadparams;`symdir;`dbdir];data;loadparams[`enumname]]; - writedatapartition[loadparams[`dbdir];;loadparams[`partitiontype];loadparams[`partitioncol];loadparams[`tablename];data] each distinct loadparams[`partitiontype]$data[loadparams`partitioncol]; - if[loadparams`gc; .Q.gc[]]; + if[not loadparams[`filename]in filesread;filesread,:loadparams`filename]; / if we havent read this file before, add it to filesread + data:0!loadparams[`dataprocessfunc].(loadparams;data); / apply the user provided processing function to the data + domain:(`sym;loadparams`enumname)`enumname in key loadparams; / if enumname provided, use it, otherwise default to `sym + data:.Q.ens[loadparams(`dbdir`symdir)`symdir in key loadparams;data;domain]; / enumerate sym columns to given domain + wd:writedatapartition[data]. loadparams`dbdir`partitiontype`partitioncol`tablename; / create write down function using all the params + wd each distinct loadparams[`partitiontype]$data loadparams`partitioncol; / run the writedatapartition function for each partition + if[loadparams`gc;.Q.gc[]]; }; -writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] +writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] / write data for provdided database and partition towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; @@ -26,27 +26,27 @@ writedatapartition:{[dbdir;partition;partitiontype;partitioncol;tablename;data] finish:{[loadparams] / adds compression, sorting and attributes selected - if[count loadparams `compression;.z.zd:loadparams`compression]; / temporarily set compression defaults - {.m.dataloader.util.sorttab (x;where .z.m.partitions[;0]=x)} each distinct value .z.m.partitions[;0]; + if[count loadparams`compression;.z.zd:loadparams`compression]; / temporarily set compression defaults + {.z.m.util.sorttab(x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; system"x .z.zd"; - if[loadparams`gc; .Q.gc[]]; + if[loadparams`gc;.Q.gc[]]; }; -loadallfiles:{[loadparams:.m.dataloader.util.paramfilter;dir] +loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] / load all the files from a specified directory .z.m.partitions::()!(); - filesread::(); + .z.m.filesread:(); filelist:$[`filepattern in key loadparams; - (key dir:hsym dir) where max like[key dir;] each loadparams[`filepattern]; - key dir:hsym dir]; / get the contents of the directory based on optional filepattern - filelist:` sv' dir,'filelist; - {[loadparams;file] .Q.fsn[loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams] each filelist; - .z.m.finish[loadparams]; + key[dir:hsym dir]where key[dir]like first loadparams`filepattern; + key dir:hsym dir]; / get the contents of the directory based on optional filepattern + filelist:` sv'dir,'filelist; + {[loadparams;file].Q.fsn[loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams]each filelist; + .z.m.finish loadparams; }; -init:{[sortparams:.m.dataloader.util.sortfilter] +init:{[sortparams:.z.m.util.sortfilter] / package init function - .z.m.partitions:()!(); / maintain a dictionary of the db partitions written to by loader - .z.m.filesread:(); / maintain a list of files which have been read + .z.m.partitions:()!(); / maintain a dictionary of the db partitions written to by loader + .z.m.filesread:(); / maintain a list of files which have been read .z.m.sortparams:sortparams; }; diff --git a/dataloader/init.q b/dataloader/init.q index a32718b..8b1f3b5 100644 --- a/dataloader/init.q +++ b/dataloader/init.q @@ -1,6 +1,6 @@ -//Load core functionality into root module namespace +// load core dataloader functions \l ::dataloader.q -//Create secondary namespace level and load relevant script into it +// load util submodule util:use`dataloader.util -//Return to root module namespace to simplify exposure of public functions +// expose public function export:([init:init;loadallfiles:loadallfiles]) diff --git a/dataloader/test.csv b/dataloader/test.csv index 8ecde05..c5f960a 100644 --- a/dataloader/test.csv +++ b/dataloader/test.csv @@ -1,57 +1,70 @@ -true,0,0,q,(".loader.init requires a dictionary parameter")~@[.loader.util.sortfilter;`;{x}],1,,Test error trapping for wrong param type -true,0,0,q,("Error ensure dictionary values are the correct types 11 11 11 1h or -11 -11 -11 -1h")~@[.loader.util.sortfilter;(`tabname`att`column`sort)!(1;2;3;4);{x}],1,,Test error trapping for param value types -true,0,0,q,("Error ensure argument is a dictionary with keys `tabname`att`column`sort")~@[.loader.util.sortfilter;(`test1`test2`test3`test4)!(`test1;`test2;`test3;1b);{x}],1,,Test error trapping for from param key values -run,0,0,q,.loader.init[`tabname`att`column`sort!(`default`quote`tradedaily;`p`s`;`sym`time`;110b)],1,,Init package with defaults for all tables +run,0,0,q,dataloader:use`dataloader,1,,Load in dataloader package +run,0,0,q,os:use`os,1,,Load in os package for system commands +run,0,0,q,os.cd hsym first`$system"echo $HOME/kdbx-packages/dataloader",1,,Cd to dataloader directory +run,0,0,q,{if[os.isdir x;os.deldir x]}`:test,1,,Delete test directory if it already existed +run,0,0,q,os.mkdir each`:test/data/hdb`:test/data/files`:test/data/symdir,1,,Create hdb/files/enum directory +run,0,0,q,hdbpath:hsym`$os.abspath`:test/data/hdb,,1,,Hardcode hdb path +run,0,0,q,filespath:hsym`$os.abspath`:test/data/files,,1,,Hardcode files path +run,0,0,q,symdirpath:hsym`$os.abspath`:test/data/symdir,,1,,Hardcode enum files path -run,0,0,q,.test.dataloader.mockdirs[0b;`trade],1,,Create temporary test directory for checking dataloading with trade no headers -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`time`sym`price`size`side`exchange;"PSFISS";.test.dataloader.delimeter;`trade;`:test/data/hdb);`:test/data/files],1,,Test load of trade file with no headers +run,0,0,q,tradetab:([]time:asc 2024.01.15D09:30+10?00:10:00;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;price:(5*10?10000)%100;size:5*10?100;side:10?`B`S;exchange:10?`NYSE`NASDAQ),1,,Create mock trade table +run,0,0,q,quotetab:([]time:asc 2024.01.15D09:30+10?00:10:00;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;bid:(5*10?10000)%100;ask:(5*10?10000)%100;bsize:5*10?100;asize:5*10?100;exchange:10?`NYSE`NASDAQ),1,,Create mock quote table +run,0,0,q,"tradedailytab:([]date:asc 2024.02.01,2024.01.15+9?10;time:{x+10?x}09:00:00.000;sym:10?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;price:(5*10?10000)%100;size:5*10?100;exchange:10?`NYSE`NASDAQ)",1,,Create mock tradedaily table + +run,0,0,q,"(` sv filespath,`tradenh.csv)0:1_csv 0:tradetab",1,,Save trade csv without headers +run,0,0,q,"(` sv filespath,`trade.csv)0:csv 0:tradetab",1,,Save trade csv with headers +run,0,0,q,"(` sv filespath,`tradedaily.csv)0:csv 0:tradedailytab",1,,Save tradedaily csv +run,0,0,q,"(` sv filespath,`quotenh.csv)0:1_csv 0:quotetab",1,,Save quote csv without headers +run,0,0,q,"(` sv filespath,`quote.csv)0:csv 0:quotetab",1,,Save quote csv with headers + +run,0,0,q,dataloader.init[`tabname`att`column`sort!(`default`quote`tradedaily;`p`s`;`sym`time`;110b)],1,,Init package with defaults for all tables + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";csv;`trade;hdbpath;"tradenh.csv");filespath],1,,Test load of trade file with no headers run,0,0,q,\l test/data/hdb,1,,Load database true,0,0,q,`trade in tables[],1,,Test table loaded to db -true,0,0,q,`p=(meta trade)[`sym][`a],1,,Check sym column is parted +true,0,0,q,`p=meta[trade][`sym;`a],1,,Check sym column is parted true,0,0,q,`2024.01.15 in key`:.,1,,Check table saved to correct partition true,0,0,q,`sym in key `:.,1,,Check sym file created and enumerated against -run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking dataloading with trade with headers -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb);`:test/data/files],1,,Test load of trade file with no headers -run,0,0,q,\l test/data/hdb,1,,Load database +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;"trade.csv");filespath],1,,Test load of trade file with headers +run,0,0,q,\l .,1,,Reload database true,0,0,q,`trade in tables[],1,,Test table loaded to db -true,0,0,q,`p=(meta trade)[`sym][`a],1,,Check sym column is parted +true,0,0,q,`p=meta[trade][`sym;`a],1,,Check sym column is parted true,0,0,q,`2024.01.15 in key`:.,1,,Check table saved to correct partition true,0,0,q,`sym in key `:.,1,,Check sym file created and enumerated against -run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking dataloading with trade with headers -run,0,0,q,.test.dataloader.mocksymdir[],1,,Create temporary enum directory -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`symdir!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;`:test/data/symdir);`:test/data/files],1,,Test load of trade file with specfied symdir -run,0,0,q,\l test/data/hdb,1,,Load database +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`symdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;symdirpath;"trade.csv");filespath],1,,Test load of trade file with specfied symdir +run,0,0,q,\l .,1,,Reload database true,0,0,q,`trade in tables[],1,,Test table loaded to db -true,0,0,q,("symdir" in system"ls ../."),1,,Test sym file enumerated to custom directory +true,0,0,q,os.exists`:../symdir,1,,Test sym file enumerated to custom directory -run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking dataloading with trade with headers -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`enumname!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;`mocksym);`:test/data/files],1,,Test load of trade file with specfied symfile name -run,0,0,q,\l test/data/hdb,1,,Load database -true,0,0,q,`mocksym in key `:.,1,,Check that db sym file has custom name +run,0,0,q,os.del`:sym,1,,Remove sym +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`enumname`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;`mocksym;"trade.csv");filespath],1,,Test load of trade file with specfied symfile name +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`mocksym in key`:.,1,,Check that db sym file has custom name -run,0,0,q,.test.dataloader.mockdirs[1b;`tradedaily],1,,Create temporary test directory for checking dataloading with tradedaily and custom partitioning -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitiontype`partitioncol!(`date`time`sym`price`size`exchange;"DTSFIS";enlist .test.dataloader.delimeter;`tradedaily;`:test/data/hdb;`month;`date);`:test/data/files],1,,Test custom partition overwrites -run,0,0,q,\l test/data/hdb,1,,Load database -true,0,0,q,all (`2024.01`2024.02`2024.03`2024.04) in\: key `:.,1,,Check partitions were created as month and all are there +run,0,0,q,os.del`:mocksym,1,,Remove mocksym +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`partitiontype`partitioncol`filepattern!(`date`time`sym`price`size`exchange;"DTSFIS";1#csv;`tradedaily;hdbpath;`month;`date;"tradedaily.csv");filespath],1,,Test custom partition overwrites +run,0,0,q,\l .,1,,Reload database +true,0,0,q,all`2024.01`2024.02 in key`:.,1,,Check partitions were created as month and all are there true,0,0,q,all null exec a from 0!meta tradedaily,1,,Check no attributes applied to tradedaily table +run,0,0,q,os.deldir each`:2024.01`:2024.02,1,,Remove month partitions -run,0,0,q,.test.dataloader.mockdirs[0b;`quote],1,,Create temporary test directory for checking dataloading with quote and diadic function application -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";.test.dataloader.delimeter;`quote;`:test/data/hdb;.test.dataloader.dataprocessfunc);`:test/data/files],1,,Test load of quote file -run,0,0,q,\l test/data/hdb,1,,Load database -true,0,0,q,`quote in tables[],1,,Check that table loaded to db -true,0,0,q,`mid in cols quote,1,,Check that column from update function is included in save down - -run,0,0,q,.test.dataloader.mockdirs[0b;`quote],1,,Create temporary test directory for checking dataloading with quote reading in with chunks -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`chunksize!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";.test.dataloader.delimeter;`quote;`:test/data/hdb;200i);`:test/data/files],1,,Test load of quote file -run,0,0,q,\l test/data/hdb,1,,Load database +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`chunksize`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";csv;`quote;hdbpath;200i;"quotenh.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,reload database true,0,0,q,`quote in tables[],1,,Test quote table loaded to db in chunks true,0,0,q,10=count quote,1,,Test all of quote file loaded in chunks -run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking compression loading -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`compression!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;(17i;2i;6i));`:test/data/files],1,,Test loading with compression specified -run,0,0,q,\l test/data/hdb,1,,Load database +run,0,0,q,os.deldir`:2024.01.15,1,,Delete directory to handle new column +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";1#csv;`quote;hdbpath;{[loaderparams;data]update mid:avg(bid;ask)from data};"quote.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`quote in tables[],1,,Check that table loaded to db +true,0,0,q,`mid in cols quote,1,,Check that column from update function is included in save down + +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`compression`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;(17i;2i;6i);"trade.csv");filespath],1,,Test loading with compression specified +run,0,0,q,\l .,1,,Reload database true,0,0,q,`trade in tables[],1,,Test trade table loaded to db true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/exchange)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for exchange column true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/price)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for price column @@ -60,17 +73,17 @@ true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/size)[`algorithm`logicalBlockSize true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/sym)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for sym column true,0,0,q,(2i;17i;6i)~(-21!`:2024.01.15/trade/time)[`algorithm`logicalBlockSize`zipLevel],1,,Check compression for time column -run,0,0,q,.test.dataloader.mockdirs[1b;`trade],1,,Create temporary test directory for checking garbage collection loading -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`gc!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;1b);`:test/data/files],1,,Test loading with garbage collection enabled -run,0,0,q,\l test/data/hdb,1,,Load database +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`gc`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;1b;"trade.csv");filespath],1,,Test loading with garbage collection enabled +run,0,0,q,\l .,1,,Reload database true,0,0,q,`trade in tables[],1,,Test trade table loaded to db -run,0,0,q,.test.dataloader.mockdirs[1b;`trade`quote],1,,Create temporary test directory for checking dataloading with trade and quote at the same time specifying with filepattern -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";enlist .test.dataloader.delimeter;`trade;`:test/data/hdb;"trade.csv");`:test/data/files],1,,Test load of trade file -run,0,0,q,.loader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";enlist .test.dataloader.delimeter;`quote;`:test/data/hdb;"quote.csv");`:test/data/files],1,,Test load of quote file -run,0,0,q,\l test/data/hdb,1,,Load database -true,0,0,q,`s=(meta quote)[`time][`a],1,,Check quote table sorted on time +run,0,0,q,os.deldir`:2024.01.15,1,,Remove partition to avoid errors +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";1#csv;`trade;hdbpath;"trade.csv");filespath],1,,Test load of trade file +run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`bid`ask`bsize`asize`exchange;"PSFFIIS";1#csv;`quote;hdbpath;"quote.csv");filespath],1,,Test load of quote file +run,0,0,q,\l .,1,,Reload database +true,0,0,q,`s=meta[quote][`time;`a],1,,Check quote table sorted on time true,0,0,q,`trade in tables[],1,,Test trade table loaded to db true,0,0,q,`quote in tables[],1,,Test quote table loaded to db -run,0,0,q,.test.dataloader.complete[],1,,Remove testing evidence +run,0,0,q,os.cd hsym first`$system"echo $HOME/kdbx-packages/dataloader",1,,Cd to dataloader directory +run,0,0,q,os.deldir`:test/,1,,Delete the test directory diff --git a/dataloader/test.q b/dataloader/test.q deleted file mode 100644 index 64039cf..0000000 --- a/dataloader/test.q +++ /dev/null @@ -1,57 +0,0 @@ -/ helper script for unit tests - -.test.dataloader.mockdirs:{[headers;tabs] - / function creates temporary mock directory for test data i/o. If exsits will reset it - if["hdb"~last vs["/";first system"pwd"];system "cd ../../.."]; - $[()~key hsym `:test/data;system"mkdir test/data";system"rm -rf test/data/*"]; - {system "mkdir test/data/",x} each ("hdb";"files"); - tabs:(),tabs; - if[`trade in tabs; - `:test/data/files/trade.csv 0: $[headers;csv 0: .test.dataloader.mocktrade;1_ csv 0: .test.dataloader.mocktrade]; - ]; - if[`quote in tabs; - `:test/data/files/quote.csv 0: $[headers;csv 0: .test.dataloader.mockquote;1_ csv 0: .test.dataloader.mockquote]; - ]; - if[`tradedaily in tabs; - `:test/data/files/tradedaily.csv 0: $[headers;csv 0: .test.dataloader.mocktradedaily;1_ csv 0: .test.dataloader.mockquote]; - ]; - }; - -.test.dataloader.mocktrade:([] - time:2024.01.15D09:30:00.000 2024.01.15D09:30:01.250 2024.01.15D09:30:02.500 2024.01.15D09:30:03.750 2024.01.15D09:30:05.000 2024.01.15D09:30:06.125 2024.01.15D09:30:07.375 2024.01.15D09:30:08.625 2024.01.15D09:30:09.875 2024.01.15D09:30:11.000; - sym:`AAPL`GOOGL`MSFT`AAPL`TSLA`GOOGL`MSFT`AAPL`TSLA`GOOGL; - price:150.25 2750.80 415.60 150.30 245.75 2751.25 415.75 150.35 245.90 2750.95; - size:100 50 200 150 75 25 300 125 100 80; - side:`B`S`B`S`B`B`S`B`S`B; - exchange:`NASDAQ`NASDAQ`NYSE`NASDAQ`NASDAQ`NASDAQ`NYSE`NASDAQ`NASDAQ`NASDAQ - ); - -.test.dataloader.mockquote:([] - time:2024.01.15D09:30:00.000 2024.01.15D09:30:00.500 2024.01.15D09:30:01.000 2024.01.15D09:30:01.500 2024.01.15D09:30:02.000 2024.01.15D09:30:02.500 2024.01.15D09:30:03.000 2024.01.15D09:30:03.500 2024.01.15D09:30:04.000 2024.01.15D09:30:04.500; - sym:`AAPL`AAPL`GOOGL`GOOGL`MSFT`MSFT`TSLA`TSLA`AAPL`GOOGL; - bid:150.20 150.25 2750.50 2750.75 415.55 415.58 245.70 245.85 150.28 2750.85; - ask:150.25 150.30 2750.80 2751.05 415.60 415.65 245.75 245.95 150.33 2751.15; - bsize:500 300 100 150 400 250 200 175 350 125; - asize:400 250 125 100 350 200 150 125 300 100; - exchange:`NASDAQ`NASDAQ`NASDAQ`NASDAQ`NYSE`NYSE`NASDAQ`NASDAQ`NASDAQ`NASDAQ - ); - -.test.dataloader.mocktradedaily:([] - date:2024.01.15 2024.01.16 2024.01.17 2024.02.15 2024.02.16 2024.02.17 2024.03.15 2024.03.16 2024.03.17 2024.04.15 2024.04.16 2024.04.17; - time:09:30:00.000 14:25:30.500 09:35:15.250 15:45:22.750 10:15:45.125 13:20:18.375 11:05:33.625 16:00:00.000 09:45:12.875 14:30:45.000 10:30:22.125 15:15:33.250; - sym:`AAPL`MSFT`GOOGL`TSLA`NVDA`AMD`AAPL`MSFT`GOOGL`TSLA`NVDA`AMD; - price:150.25 415.60 2750.80 245.75 870.45 142.30 151.30 416.25 2755.90 246.50 872.10 143.85; - size:1000 1500 500 750 900 1100 1200 800 600 900 1050 1250; - exchange:`NASDAQ`NYSE`NASDAQ`NASDAQ`NASDAQ`NYSE`NASDAQ`NYSE`NASDAQ`NASDAQ`NASDAQ`NYSE - ); - -.test.dataloader.mocksymdir:{system "mkdir test/data/symdir"}; - -.test.dataloader.dataprocessfunc:{[loaderparams;data] - / testing function will calculate mid column from quote data - update mid:avg(bid;ask) from data - }; - -.test.dataloader.delimeter:","; - -.test.dataloader.complete:{system"cd ../../..";system"rm -rf test/data"}; diff --git a/dataloader/util.q b/dataloader/util.q index 623c83d..e5b0091 100644 --- a/dataloader/util.q +++ b/dataloader/util.q @@ -1,54 +1,52 @@ applyattr:{[dloc;colname;att] - .[{@[x;y;z#]};(dloc;colname;att); // Attempt to apply the attribute to the column and log an error if it fails - {[dloc;colname;att;e] - '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; - }[dloc;colname;att] - ] - }; + .[{@[x;y;z#]};(dloc;colname;att); / Attempt to apply the attribute to the column + {[dloc;colname;att;e] + '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; + }[dloc;colname;att] + ] + }; // Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. sorttab:{[d] - if[1>sum exec sort from .m.dataloader.sortparams;:()]; - sp:$[count tabparams:select from .m.dataloader.sortparams where tabname=d[0]; - tabparams; - count defaultparams:select from .m.dataloader.sortparams where tabname=`default; - defaultparams - ]; - {[sp;dloc] // Loop through each directory and sort the data - if[count sortcols: exec column from sp where sort, not null column; - .[xasc;(sortcols;dloc);{[sortcols;dloc;e] '"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; - if[count attrcols: select column, att from sp where not null att; - applyattr[dloc;;]'[attrcols`column;attrcols`att]]; // Apply attribute(s) - }[sp] each distinct (),last d; + if[1>sum exec sort from .m.dataloader.sortparams;:()]; + sp:$[count tabparams:select from .m.dataloader.sortparams where tabname=d[0]; + tabparams; + count defaultparams:select from .m.dataloader.sortparams where tabname=`default; + defaultparams + ]; + {[sp;dloc] / Loop through each directory and sort the data + if[count sortcols:exec column from sp where sort,not null column; + .[xasc;(sortcols;dloc);{[sortcols;dloc;e]'"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; + if[count attrcols:select column,att from sp where not null att; + applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / Apply attribute(s) + }[sp]each distinct(),last d; }; // Function checks keys are correct and value have the right types for loadallfiles argument paramfilter:{[loadparams] - if[not 99h=type loadparams; '"loadallfiles requires a dictionary parameter"]; // Check the input - req:`headers`types`tablename`dbdir`separator; // Required fields - if[not all req in key loadparams; - '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; - if[not count loadparams `symdir;loadparams[`symdir]:loadparams[`dbdir]]; - loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; // Join the loadparams with some default values - reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!`short$(11;10;-11;-11;-11;-6;-11;-11;-1); // Required types n - if[count w:where not (type each loadparams key reqtypes)=reqtypes; // Check the types - '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; - if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; - if[not 99h Date: Tue, 28 Oct 2025 15:15:50 +0000 Subject: [PATCH 15/17] update initailising the package and refactor variables --- dataloader/dataloader.md | 47 ++++++++++++++++++++-------------------- dataloader/dataloader.q | 28 +++++++++++++----------- dataloader/init.q | 2 +- dataloader/test.csv | 2 +- dataloader/util.q | 16 +++++--------- 5 files changed, 46 insertions(+), 49 deletions(-) diff --git a/dataloader/dataloader.md b/dataloader/dataloader.md index b2d2029..05ec50b 100644 --- a/dataloader/dataloader.md +++ b/dataloader/dataloader.md @@ -28,37 +28,42 @@ When all the data is written, the on-disk data is re-sorted and the attributes a ## :gear: Initialisation -After loading the package into the session the unary function `init` is called with a single argument to initialise the packages global variables and define any configurable sorting/attributes. +After loading the package into the session, the `loadallfiles` function is ready to run. By default, tables will be written with the `p` attribute applied to the `sym` column and sorted. -### :mag_right: Params in Depth +### :mag_right: Custom sorting parameters + +By default, the sorting paramters for all tables are: +``` +tabname att column sort +----------------------- +default p sym 1 +``` + +That is, for every table the `p` attribute will be applied to the `sym` column and sorted. If the table being loaded requires different attributes applied on different columns, custom sorting parameters can be added using the `addsortparams` function. This takes 4 inputs: tabname, att, column, and sort. These arguments are used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. Furthermore, this will add (or update existing) parameters for the specified table. + +You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters. By passing in `default` this will overwrite the current default paramters. -The sole argument to `init` should be a dictionary with the keys `tabname, att, column and sort`. The argument is used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. -You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters, -or you can apply specific sorting and attribute configurations to tables when loading them into the database including determining where they should not be applied by using a specific table name. If no sorting or attributes are required pass in the dictionary with a `tabname` with `default`, `att` and `column` with backticks and `sort` with `0b`, examples shown below: ```q -init[`tabname`att`column`sort!(`default;`;`;0b)] // Apply no sorting or attributes -init[`tabname`att`column`sort!(`default;`p;`sym;1b)] // Sort all tables loaded in by the sym column and apply the parted attribute -init[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] // Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes +dataloader:use`dataloader +dataloader.addsortparams[`tabname`att`column`sort!(`default;`;`;0b)] / Overwrite default to apply no sorting or attributes +dataloader.addsortparams[`tabname`att`column`sort!(`default;`p;`sym;1b)] / Overwrite default to sort all tables loaded in by the sym column and apply the parted attribute +dataloader.addsortparams[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] / Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes ``` The dictionary arguments are outlined below. -| Key | Values | Description | +| Input | Type | Description | |-----------|-----------------------|----------------------------------------------------------------------------------| | `tabname` | symbol/ symbol list | Name of table | | `att` | symbol/ symbol list | Attributes corresponding to the table names | | `column` | symbol/ symbol list | Columns to sort and apply attributes to | | `sort` | boolean /boolean list | Determines if the corresponding table will be sorted (1b: sorted; 0b:not sorted) | - --- ### :rocket: Functions -`loadallfiles` is the primary function used to load in all data and create the database. -The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. -The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, -persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes. +`loadallfiles` is the primary function used to load in all data and create the database. The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes. ## :mag_right: Params in depth @@ -91,22 +96,20 @@ The second parameter is a directory handle .e.g ### :test_tube: Example ```q -\l dataloader.q - -// Initialise the package -init[`tabname`att`column`sort!(`default;`p;`sym;1b)] +dataloader:use`dataloader -// Check table exists -select from sortparams +// If using custom sorting parameters, check they are as expected +dataloader.sortparams[] tabname att column sort ----------------------- default p sym 1 // Read in data and create db -loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb); `:TRADE/toload] +dataloader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb);`:TRADE/toload] //load in db \l hdb + //check table and sorting select from trade @@ -122,7 +125,6 @@ date sym time price volume mktflag cond exclude // Ensure attributes are applied meta trade - c | t f a -------| ----- date | d @@ -133,5 +135,4 @@ volume | i mktflag| c cond | h exclude| b - ``` diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index e870b1a..1957d2c 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,8 +1,8 @@ / generic dataloader library +/ loads data in from delimited file, applies processing function, enumerates and writes to db +/ NOTE: it is not trivial to check user has inputted headers correctly, assume they have loaddata:{[loadparams;rawdata] - / loads data in from delimited file, applies processing function, enumerates and writes to db - / NOTE: it is not trivial to check user has inputted headers correctly, assume they have data:$[(`$"," vs rawdata 0)~loadparams`headers; / check if first row matches headers provided (loadparams`types`separator)0:rawdata; / if so, read in the files normally flip loadparams[`headers]!(loadparams`types`separator)0:rawdata / if not, add the headers manually @@ -16,37 +16,39 @@ loaddata:{[loadparams;rawdata] if[loadparams`gc;.Q.gc[]]; }; +/ write data for provdided database and partition writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] - / write data for provdided database and partition towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; .z.m.partitions[writepath]:(tablename;partition); }; +/ adds compression, sorting and attributes selected finish:{[loadparams] - / adds compression, sorting and attributes selected if[count loadparams`compression;.z.zd:loadparams`compression]; / temporarily set compression defaults - {.z.m.util.sorttab(x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; + {.z.m.util.sorttab[.z.m.sp](x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; system"x .z.zd"; if[loadparams`gc;.Q.gc[]]; }; +/ load all the files from a specified directory loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] - / load all the files from a specified directory - .z.m.partitions::()!(); + .z.m.partitions:()!(); .z.m.filesread:(); filelist:$[`filepattern in key loadparams; key[dir:hsym dir]where key[dir]like first loadparams`filepattern; key dir:hsym dir]; / get the contents of the directory based on optional filepattern filelist:` sv'dir,'filelist; - {[loadparams;file].Q.fsn[loaddata[loadparams,(enlist`filename)!enlist file];file;loadparams`chunksize]}[loadparams]each filelist; + {[loadparams;file].Q.fsn[loaddata loadparams,(enlist`filename)!enlist file;file;loadparams`chunksize]}[loadparams]each filelist; .z.m.finish loadparams; }; -init:{[sortparams:.z.m.util.sortfilter] - / package init function - .z.m.partitions:()!(); / maintain a dictionary of the db partitions written to by loader - .z.m.filesread:(); / maintain a list of files which have been read - .z.m.sortparams:sortparams; +sp:flip`tabname`att`column`sort!(1#`default;`p;`sym;1b); +sortparams:{[].z.m.sp}; + +/ add custom sorting parameters to the sortparams table +addsortparams:{[tabname;att;column;sort] + x:flip(flip sortparams[]),'(tabname;att;column;sort); + .z.m.sp:select from x where i=(last;i)fby tabname; }; diff --git a/dataloader/init.q b/dataloader/init.q index 8b1f3b5..c5cd5f0 100644 --- a/dataloader/init.q +++ b/dataloader/init.q @@ -3,4 +3,4 @@ // load util submodule util:use`dataloader.util // expose public function -export:([init:init;loadallfiles:loadallfiles]) +export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) diff --git a/dataloader/test.csv b/dataloader/test.csv index c5f960a..9c967dc 100644 --- a/dataloader/test.csv +++ b/dataloader/test.csv @@ -17,7 +17,7 @@ run,0,0,q,"(` sv filespath,`tradedaily.csv)0:csv 0:tradedailytab",1,,Save traded run,0,0,q,"(` sv filespath,`quotenh.csv)0:1_csv 0:quotetab",1,,Save quote csv without headers run,0,0,q,"(` sv filespath,`quote.csv)0:csv 0:quotetab",1,,Save quote csv with headers -run,0,0,q,dataloader.init[`tabname`att`column`sort!(`default`quote`tradedaily;`p`s`;`sym`time`;110b)],1,,Init package with defaults for all tables +run,0,0,q,dataloader.addsortparams[`quote`tradedaily;`s`;`time`;10b],1,,Add custom sort parameters run,0,0,q,dataloader.loadallfiles[`headers`types`separator`tablename`dbdir`filepattern!(`time`sym`price`size`side`exchange;"PSFISS";csv;`trade;hdbpath;"tradenh.csv");filespath],1,,Test load of trade file with no headers run,0,0,q,\l test/data/hdb,1,,Load database diff --git a/dataloader/util.q b/dataloader/util.q index e5b0091..6b8eb00 100644 --- a/dataloader/util.q +++ b/dataloader/util.q @@ -7,18 +7,18 @@ applyattr:{[dloc;colname;att] }; // Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. -sorttab:{[d] - if[1>sum exec sort from .m.dataloader.sortparams;:()]; - sp:$[count tabparams:select from .m.dataloader.sortparams where tabname=d[0]; +sorttab:{[sortparams;d] + if[1>sum exec sort from sortparams;:()]; + sp:$[count tabparams:select from sortparams where tabname=d[0]; tabparams; - count defaultparams:select from .m.dataloader.sortparams where tabname=`default; + count defaultparams:select from sortparams where tabname=`default; defaultparams ]; {[sp;dloc] / Loop through each directory and sort the data if[count sortcols:exec column from sp where sort,not null column; .[xasc;(sortcols;dloc);{[sortcols;dloc;e]'"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; if[count attrcols:select column,att from sp where not null att; - applyattr[dloc;;]'[attrcols`column;attrcols`att]]; / Apply attribute(s) + applyattr[dloc]'[attrcols`column;attrcols`att]]; / Apply attribute(s) }[sp]each distinct(),last d; }; @@ -43,10 +43,4 @@ paramfilter:{[loadparams] loadparams }; -// Function checks dictionary argument for init function has correct headers and types -sortfilter:{[sortparams] - if[not 99h=type sortparams;'"init requires a dictionary parameter"]; - flip(),/:sortparams - }; - export:.z.m; From b822cc7105f0ffe73a63e5401eb92c1d3e107060 Mon Sep 17 00:00:00 2001 From: Eliot Robinson Date: Wed, 29 Oct 2025 11:26:18 +0000 Subject: [PATCH 16/17] fix comments --- dataloader/dataloader.q | 16 +++++----------- dataloader/init.q | 9 +++------ dataloader/util.q | 6 ++---- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index 1957d2c..c093cc0 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,8 +1,6 @@ / generic dataloader library -/ loads data in from delimited file, applies processing function, enumerates and writes to db -/ NOTE: it is not trivial to check user has inputted headers correctly, assume they have -loaddata:{[loadparams;rawdata] +loaddata:{[loadparams;rawdata] / loads data in from delimited file, applies processing function, enumerates and writes to db. NOTE: it is not trivial to check user has inputted headers correctly, assume they have data:$[(`$"," vs rawdata 0)~loadparams`headers; / check if first row matches headers provided (loadparams`types`separator)0:rawdata; / if so, read in the files normally flip loadparams[`headers]!(loadparams`types`separator)0:rawdata / if not, add the headers manually @@ -16,24 +14,21 @@ loaddata:{[loadparams;rawdata] if[loadparams`gc;.Q.gc[]]; }; -/ write data for provdided database and partition -writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] +writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] / write data for provdided database and partition towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; .z.m.partitions[writepath]:(tablename;partition); }; -/ adds compression, sorting and attributes selected -finish:{[loadparams] +finish:{[loadparams] / adds compression, sorting and attributes selected if[count loadparams`compression;.z.zd:loadparams`compression]; / temporarily set compression defaults {.z.m.util.sorttab[.z.m.sp](x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; system"x .z.zd"; if[loadparams`gc;.Q.gc[]]; }; -/ load all the files from a specified directory -loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] +loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] / load all the files from a specified directory .z.m.partitions:()!(); .z.m.filesread:(); filelist:$[`filepattern in key loadparams; @@ -47,8 +42,7 @@ loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] sp:flip`tabname`att`column`sort!(1#`default;`p;`sym;1b); sortparams:{[].z.m.sp}; -/ add custom sorting parameters to the sortparams table -addsortparams:{[tabname;att;column;sort] +addsortparams:{[tabname;att;column;sort] / add custom sorting parameters to the sortparams table x:flip(flip sortparams[]),'(tabname;att;column;sort); .z.m.sp:select from x where i=(last;i)fby tabname; }; diff --git a/dataloader/init.q b/dataloader/init.q index c5cd5f0..7c10564 100644 --- a/dataloader/init.q +++ b/dataloader/init.q @@ -1,6 +1,3 @@ -// load core dataloader functions -\l ::dataloader.q -// load util submodule -util:use`dataloader.util -// expose public function -export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) +\l ::dataloader.q / load core dataloader functions +util:use`dataloader.util / load util submodule +export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) / expose public function diff --git a/dataloader/util.q b/dataloader/util.q index 6b8eb00..51c64b3 100644 --- a/dataloader/util.q +++ b/dataloader/util.q @@ -6,8 +6,7 @@ applyattr:{[dloc;colname;att] ] }; -// Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. -sorttab:{[sortparams;d] +sorttab:{[sortparams;d] / Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. if[1>sum exec sort from sortparams;:()]; sp:$[count tabparams:select from sortparams where tabname=d[0]; tabparams; @@ -22,8 +21,7 @@ sorttab:{[sortparams;d] }[sp]each distinct(),last d; }; -// Function checks keys are correct and value have the right types for loadallfiles argument -paramfilter:{[loadparams] +paramfilter:{[loadparams] / Function checks keys are correct and value have the right types for loadallfiles argument if[not 99h=type loadparams;'"loadallfiles requires a dictionary parameter"]; / Check the input req:`headers`types`tablename`dbdir`separator; / Required fields if[not all req in key loadparams; From 034a01f8895bbe3b6eed2877c67535d4e382324e Mon Sep 17 00:00:00 2001 From: Eliot Robinson Date: Wed, 29 Oct 2025 11:41:13 +0000 Subject: [PATCH 17/17] fix comments again... --- dataloader/dataloader.q | 43 ++++++++++++++++++++++++++--------------- dataloader/init.q | 9 ++++++--- dataloader/util.q | 26 ++++++++++++++----------- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/dataloader/dataloader.q b/dataloader/dataloader.q index c093cc0..903c970 100644 --- a/dataloader/dataloader.q +++ b/dataloader/dataloader.q @@ -1,48 +1,59 @@ / generic dataloader library -loaddata:{[loadparams;rawdata] / loads data in from delimited file, applies processing function, enumerates and writes to db. NOTE: it is not trivial to check user has inputted headers correctly, assume they have - data:$[(`$"," vs rawdata 0)~loadparams`headers; / check if first row matches headers provided - (loadparams`types`separator)0:rawdata; / if so, read in the files normally - flip loadparams[`headers]!(loadparams`types`separator)0:rawdata / if not, add the headers manually +/ loads data in from delimited file, applies processing function, enumerates and writes to db. NOTE: it is not trivial to check user has inputted headers correctly, assume they have +loaddata:{[loadparams;rawdata] + / check if first row matches headers provided + data:$[(`$"," vs rawdata 0)~loadparams`headers; + (loadparams`types`separator)0:rawdata; + flip loadparams[`headers]!(loadparams`types`separator)0:rawdata ]; - if[not loadparams[`filename]in filesread;filesread,:loadparams`filename]; / if we havent read this file before, add it to filesread - data:0!loadparams[`dataprocessfunc].(loadparams;data); / apply the user provided processing function to the data - domain:(`sym;loadparams`enumname)`enumname in key loadparams; / if enumname provided, use it, otherwise default to `sym - data:.Q.ens[loadparams(`dbdir`symdir)`symdir in key loadparams;data;domain]; / enumerate sym columns to given domain - wd:writedatapartition[data]. loadparams`dbdir`partitiontype`partitioncol`tablename; / create write down function using all the params - wd each distinct loadparams[`partitiontype]$data loadparams`partitioncol; / run the writedatapartition function for each partition + if[not loadparams[`filename]in filesread;filesread,:loadparams`filename]; + data:0!loadparams[`dataprocessfunc].(loadparams;data); + / if enumname provided, use it, otherwise default to `sym + domain:(`sym;loadparams`enumname)`enumname in key loadparams; + data:.Q.ens[loadparams(`dbdir`symdir)`symdir in key loadparams;data;domain]; + wd:writedatapartition[data]. loadparams`dbdir`partitiontype`partitioncol`tablename; + / run the writedatapartition function for each partition + wd each distinct loadparams[`partitiontype]$data loadparams`partitioncol; if[loadparams`gc;.Q.gc[]]; }; -writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] / write data for provdided database and partition +/ write data for provdided database and partition +writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition] towrite:data where partition=partitiontype$data partitioncol; writepath:` sv .Q.par[dbdir;partition;tablename],`; .[upsert;(writepath;towrite);{'"failed to save table: ",x}]; .z.m.partitions[writepath]:(tablename;partition); }; -finish:{[loadparams] / adds compression, sorting and attributes selected - if[count loadparams`compression;.z.zd:loadparams`compression]; / temporarily set compression defaults +/ adds compression, sorting and attributes selected +finish:{[loadparams] + / temporarily set compression defaults + if[count loadparams`compression;.z.zd:loadparams`compression]; {.z.m.util.sorttab[.z.m.sp](x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0]; system"x .z.zd"; if[loadparams`gc;.Q.gc[]]; }; -loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] / load all the files from a specified directory +/ load all the files from a specified directory +loadallfiles:{[loadparams:.z.m.util.paramfilter;dir] .z.m.partitions:()!(); .z.m.filesread:(); + / get the contents of the directory based on optional filepattern filelist:$[`filepattern in key loadparams; key[dir:hsym dir]where key[dir]like first loadparams`filepattern; - key dir:hsym dir]; / get the contents of the directory based on optional filepattern + key dir:hsym dir]; filelist:` sv'dir,'filelist; {[loadparams;file].Q.fsn[loaddata loadparams,(enlist`filename)!enlist file;file;loadparams`chunksize]}[loadparams]each filelist; .z.m.finish loadparams; }; +/ set default sorting parameters sp:flip`tabname`att`column`sort!(1#`default;`p;`sym;1b); sortparams:{[].z.m.sp}; -addsortparams:{[tabname;att;column;sort] / add custom sorting parameters to the sortparams table +/ add custom sorting parameters to the sortparams table +addsortparams:{[tabname;att;column;sort] x:flip(flip sortparams[]),'(tabname;att;column;sort); .z.m.sp:select from x where i=(last;i)fby tabname; }; diff --git a/dataloader/init.q b/dataloader/init.q index 7c10564..7cd5af9 100644 --- a/dataloader/init.q +++ b/dataloader/init.q @@ -1,3 +1,6 @@ -\l ::dataloader.q / load core dataloader functions -util:use`dataloader.util / load util submodule -export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) / expose public function +/ load core dataloader functions +\l ::dataloader.q +/ load util submodule +util:use`dataloader.util +/ expose public function +export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams]) diff --git a/dataloader/util.q b/dataloader/util.q index 51c64b3..c382e21 100644 --- a/dataloader/util.q +++ b/dataloader/util.q @@ -1,42 +1,46 @@ +/ function to apply attribute to a column applyattr:{[dloc;colname;att] - .[{@[x;y;z#]};(dloc;colname;att); / Attempt to apply the attribute to the column + .[{@[x;y;z#]};(dloc;colname;att); {[dloc;colname;att;e] '"unable to apply ",string[att]," attr to the ",string[colname]," column in the this directory : ",string[dloc],". The error was : ",e; }[dloc;colname;att] ] }; -sorttab:{[sortparams;d] / Function used to sort and apply attributes to tables on disk based on format provided at initialisation of package. +/ function used to sort and apply attributes to tables on disk based on format provided at initialisation of package +sorttab:{[sortparams;d] if[1>sum exec sort from sortparams;:()]; sp:$[count tabparams:select from sortparams where tabname=d[0]; tabparams; count defaultparams:select from sortparams where tabname=`default; defaultparams ]; - {[sp;dloc] / Loop through each directory and sort the data + / loop through each directory and sort the data + {[sp;dloc] if[count sortcols:exec column from sp where sort,not null column; .[xasc;(sortcols;dloc);{[sortcols;dloc;e]'"failed to sort ",string[dloc]," by these columns : ",(", " sv string sortcols),". The error was: ",e}[sortcols;dloc]]]; if[count attrcols:select column,att from sp where not null att; - applyattr[dloc]'[attrcols`column;attrcols`att]]; / Apply attribute(s) + applyattr[dloc]'[attrcols`column;attrcols`att]]; }[sp]each distinct(),last d; }; -paramfilter:{[loadparams] / Function checks keys are correct and value have the right types for loadallfiles argument - if[not 99h=type loadparams;'"loadallfiles requires a dictionary parameter"]; / Check the input - req:`headers`types`tablename`dbdir`separator; / Required fields +/ function checks keys are correct and value have the right types for loadallfiles argument +paramfilter:{[loadparams] + if[not 99h=type loadparams;'"loadallfiles requires a dictionary parameter"]; + req:`headers`types`tablename`dbdir`separator; if[not all req in key loadparams; '"loaddata requires a dictionary parameter with keys of ",(", " sv string req)," : missing ",", " sv string req except key loadparams]; if[not count loadparams`symdir;loadparams[`symdir]:loadparams`dbdir]; - loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; / Join the loadparams with some default values - reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!11 10 -11 -11 -11 -6 -11 -11 -1h; / Required types n - if[count w:where not(type each loadparams key reqtypes)=reqtypes; / Check the types + loadparams:(`dataprocessfunc`chunksize`partitioncol`partitiontype`compression`gc!({[x;y] y};`int$100*2 xexp 20;`time;`date;();0b)),loadparams; + reqtypes:`headers`types`tablename`dbdir`symdir`chunksize`partitioncol`partitiontype`gc!11 10 -11 -11 -11 -6 -11 -11 -1h; + if[count w:where not(type each loadparams key reqtypes)=reqtypes; '"incorrect types supplied for ",(", " sv string w)," parameter(s). Required type(s) are ",", " sv string reqtypes w]; if[not 10h=abs type loadparams`separator;'"separator must be a character or enlisted character"]; if[not 99h