diff --git a/code/common/merge.q b/code/common/merge.q index 13f71f9c3..f309d7bc8 100644 --- a/code/common/merge.q +++ b/code/common/merge.q @@ -27,15 +27,15 @@ checkpartitiontype:{[tablename;extrapartitiontype] .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; }; -/- function to check if the extra partition column has a symbol type -checksymboltype:{[tablename;extrapartitiontype] - $[all extrapartitiontype in exec c from meta[tablename] where t="s"; - .lg.o[`checksymbol;"all columns do have a symbol type in ",(string tablename)," table"]; - .lg.e[`checksymbol;"not all columns ",string[extrapartitiontype]," do have a symbol type in ",(string tablename)," table"]]; +/- function to check if the extra partition column has an enumerable type +checkenumerabletype:{[tablename;extrapartitiontype] + $[all extrapartitiontype in exec c from meta[tablename] where t in "hijs"; + .lg.o[`checkenumerable;"all columns do have an enumerable type in ",(string tablename)," table"]; + .lg.e[`checkenumerable;"not all columns ",string[extrapartitiontype]," do have an enumerable type in ",(string tablename)," table"]]; }; -/- function to get list of distinct combiniations for partition directories +/- function to get list of distinct combinations for partition directories /- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ] getextrapartitions:{[tablename;extrapartitiontype] value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] diff --git a/code/processes/idb.q b/code/processes/idb.q index d07b6f275..b566da0bc 100644 --- a/code/processes/idb.q +++ b/code/processes/idb.q @@ -106,6 +106,10 @@ reload:.idb.intradayreload; .idb.init[]; /- helper function to support queries against the sym column -maptoint:{[symbol] - sym?`TORQNULLSYMBOL^symbol +maptoint:{[val] + $[(abs type val) in 5 6 7h; + /- if using an integer column, clamp value between 0 and max int (null maps to 0) + 0| 2147483647& `long$ val; + /- if using a symbol column, enumerate against the hdb sym file + sym?`TORQNULLSYMBOL^val] }; diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index b56a0f80f..298ad6730 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -31,7 +31,7 @@ tempdir:@[value;`tempdir;`:tempmergedir]; // locat mergenumrows:@[value;`mergenumrows;10000000]; // default number of rows for merge process mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; // specify number of rows per table for merge process mergenumbytes:@[value;`mergenumbytes;500000000]; // default number of bytes for merge process -mergemethod:@[value;`mergemethod;`part]; // the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways: +mergemethod:@[value;`mergemethod;`part]; // the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways: // 1. part - the entire partition is merged to the hdb // 2. col - each column in the temporary partitions are merged individually // 3. hybrid - partitions merged by column or entire partittion based on byte limit diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 4c5e98f1a..65e18c7a8 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -27,12 +27,11 @@ writedownmode:@[value;`writedownmode;`default]; /-the /- 1. default - the data is partitioned by [ partitiontype ] /- at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb /- 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s) assigned the parted attributed in sort.csv - /- at EOD the data will be merged from each partiton before being moved to hdb - /- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol column with parted attribution assigned in sort.csv - /- at EOD the data will be merged from each partiton before being moved to hdb -enumcol:@[value;`enumcol;`sym]; /-symbol column to enumerate by. Only used with writedownmode: partbyenum. + /- at EOD the data will be merged from each partition before being moved to hdb + /- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv + /- at EOD the data will be merged from each partition before being moved to hdb -mergemode:@[value;`mergemode;`part]; /-the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways: +mergemode:@[value;`mergemode;`part]; /-the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways: /- 1. part - the entire partition is merged to the hdb /- 2. col - each column in the temporary partitions are merged individually /- 3. hybrid - partitions merged by column or entire partittion based on byte limit @@ -117,12 +116,20 @@ tablelist:{[] sortedlist:exec tablename from `bytes xdesc tabsizes; /- function that ensures a list of syms is returned no matter what is passed to it ensuresymlist:{[s] -1 _ `${@[x; where not ((type each x) in (10 -10h));string]} s,(::)} +/- vectorized function to map partition value(s) to int partition(s) in partbyenum mode +maptoint:{[val] + $[(abs type val) in 5 6 7h; + /- if using an integer column, clamp value between 0 and max int (null maps to 0) + 0| 2147483647& `long$ val; + /- if using a symbol column, enumerate against the hdb sym file + `long$ (` sv hdbsettings[`hdbdir],`sym)?`TORQNULLSYMBOL^val] + }; + /- function to upsert to specified directory upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt;writedownmode] - /- enumerate current extra partition against the hdb sym file - /- if extra partition is null, send to a partition enumerated against `TORQNULLSYMBOL symbol - if[writedownmode~`partbyenum;i:`long$(` sv hdbsettings[`hdbdir],`sym)? first[`TORQNULLSYMBOL^ ensuresymlist[expt]]]; - /- create directory location for selected partiton + /- enumerate first extra partition value + if[writedownmode~`partbyenum;i:maptoint first expt]; + /- create directory location for selected partition /- replace non-alphanumeric characters in symbols with _ /- convert to symbols and replace any null values with `TORQNULLSYMBOL directory:$[writedownmode~`partbyenum; @@ -150,9 +157,9 @@ savetablesbypart:{[dir;pt;forcesave;tablename;writedownmode] ]; /- check each partition type actually is a column in the selected table .merge.checkpartitiontype[tablename;extrapartitiontype]; - /- check if provided symbol column extrapartitiontype indeed has a symbol type in table - if[writedownmode~`partbyenum;.merge.checksymboltype[tablename;extrapartitiontype]]; - /- get list of distinct combiniations for partition directories + /- check if provided column extrapartitiontype indeed has an enumerable type in table + if[writedownmode~`partbyenum;.merge.checkenumerabletype[tablename;extrapartitiontype]]; + /- get list of distinct combinations for partition directories extrapartitions:.merge.getextrapartitions[tablename;extrapartitiontype]; /- enumerate data to be upserted enumdata:.Q.en[hdbsettings[`hdbdir];0!.save.manipulate[tablename;`. tablename]]; @@ -191,7 +198,7 @@ endofday:{[pt;processdata] /- set what type of merge method to be used mergemethod:mergemode; /- create a dictionary of tables and merge limits, byte or row count limit depending on settings - .lg.o[`merge;"merging partitons by ",$[.merge.mergebybytelimit;"byte estimate";"row count"]," limit"]; + .lg.o[`merge;"merging partitions by ",$[.merge.mergebybytelimit;"byte estimate";"row count"]," limit"]; mergelimits:(tablelist[],())!($[.merge.mergebybytelimit;{(count x)#mergenumbytes};{[x] mergenumrows^mergemaxrows[x]}]tablelist[]),(); tablist:tablelist[]!{0#value x} each tablelist[]; / Need to download sym file to scratch directory if this is Finspace application @@ -273,7 +280,7 @@ resetcompression:{setcompression 16 0 0 } //check if the hdb directory contains current partition //if yes check if patition is empty and if it is not see if any of the tables exist in both the -//temporary parition and the hdb partition. If there is a clash abort operation otherwise copy +//temporary partition and the hdb partition. If there is a clash abort operation otherwise copy //each table to the hdb partition movetohdb:{[dw;hw;pt] $[not(`$string pt)in key hsym`$-10 _ hw; @@ -376,7 +383,7 @@ removetablefromenumdir:{[partdir] }; endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode] - /- merge data from partitons + /- merge data from partitions /- .z.pd funciton in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS. tempfix2:$[.finspace.enabled;0b;(0 < count .z.pd[])]; $[tempfix2 and ((system "s")<0); @@ -513,14 +520,14 @@ subscribe:{[] fixpartition:{[subto] /- check if the tp logdate matches current date if[not (tplogdate:subto[`tplogdate])~orig:currentpartition; - .lg.o[`fixpartition;"Current partiton date does not match the ticker plant log date"]; - /- set the current partiton date to the log date + .lg.o[`fixpartition;"Current partition date does not match the ticker plant log date"]; + /- set the current partition date to the log date currentpartition::tplogdate; /- move the data that has been written to correct partition pth1:.os.pth[-1 _ string .Q.par[savedir;orig;`]]; pth2:.os.pth[-1 _ string .Q.par[savedir;tplogdate;`]]; if[not ()~key hsym `$.os.pthq pth1; - /- delete any data in the current partiton directory + /- delete any data in the current partition directory clearwdbdata[]; .lg.o[`fixpartition;"Moving data from partition ",(.os.pthq pth1) ," to partition ",.os.pthq pth2]; .[.os.ren;(pth1;pth2);{.lg.e[`fixpartition;"Failed to move data from wdb partition ",x," to wdb partition ",y," : ",z]}[pth1;pth2]]]; diff --git a/code/wdb/origstartup.q b/code/wdb/origstartup.q index ee6b3a188..abcd269ee 100644 --- a/code/wdb/origstartup.q +++ b/code/wdb/origstartup.q @@ -6,7 +6,7 @@ startup:{[] $[writedownmode~`partbyattr; .lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[tablename]/[parted column(s)]/"]; writedownmode~`partbyenum; - .lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[parted symbol column enumerated]/[tablename]/"]; + .lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[parted column enumerated]/[tablename]/"]; .lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[tablename]/"]]; if[saveenabled; //check if tickerplant is available and if not exit with error diff --git a/code/wdb/writedown.q b/code/wdb/writedown.q index e357b6606..d7ab15589 100644 --- a/code/wdb/writedown.q +++ b/code/wdb/writedown.q @@ -20,7 +20,7 @@ getpartition:@[value;`getpartition; /-fun {{@[value;`.wdb.currentpartition; (`date^partitiontype)$.proc.cd[]]}}]; -currentpartition:.wdb.getpartition[]; /- Initialise current partiton +currentpartition:.wdb.getpartition[]; /- Initialise current partition tabsizes:([tablename:`symbol$()] rowcount:`long$(); bytes:`long$()); /- keyed table to track the size of tables on disk diff --git a/config/settings/wdb.q b/config/settings/wdb.q index 8a9b2a122..f4461aadf 100644 --- a/config/settings/wdb.q +++ b/config/settings/wdb.q @@ -30,15 +30,14 @@ mode:`save // data on disk, apply attributes and the trigger a reload on the // rdb and hdb processes writedownmode:`default // the wdb process can periodically write data to disc and sort at EOD in two ways: - // 1. default - the data is partitioned by [ partitontype ] + // 1. default - the data is partitioned by [ partitiontype ] // at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb // 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s)assigned the parted attributed in sort.csv - // at EOD the data will be merged from each partiton before being moved to hdb - // 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol column with parted attrobution assigned in sort.csv - // at EOD the data will be merged from each partiton before being moved to hdb -enumcol:`sym; // default column for partitioning. Only used with writedownmode: partbyenum. + // at EOD the data will be merged from each partition before being moved to hdb + // 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv + // at EOD the data will be merged from each partition before being moved to hdb -mergemode:`part // the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways: +mergemode:`part // the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways: // 1. part - the entire partition is merged to the hdb // 2. col - each column in the temporary partitions are merged individually // 3. hybrid - partitions merged by column or entire partittion based on byte limit diff --git a/docs/Processes.md b/docs/Processes.md index 8ce2e6bfd..b25ea2750 100755 --- a/docs/Processes.md +++ b/docs/Processes.md @@ -1022,12 +1022,15 @@ sorting at the end of the day. - partbyenum - Data is persisted to a partition scheme where the partition is derived from parameters in the sort.csv file. In this mode partition only can be done by one column which has parted attribute applied on it - and it also has to be of a symbol type. The partitioning on disk will - be the enumerated symbol entries of the parted symbol column. The - enumeration is done against the HDB sym file. - The general partition scheme is of the form - \[wdbdir\]/\[partitiontype\]/\[parted enumerated symbol column\]/\[table(s)\]/. - A typical partition directory would be similar to(for ex sym: MSFT_N) + and it also has to be of a symbol or integer (short, int, long) type. + If the column is a symbol type, the partitioning on disk will + be the symbol entries enumerated against the HDB sym file. + If the column is an integer type, the partitioning on disk will + be the raw integer values clamped between 0 and 2,147,483,647 + (the maximum int value), with negative and null values + mapped to 0. The general partition scheme is of the form + \[wdbdir\]/\[partitiontype\]/\[parted enumerated column\]/\[table(s)\]/. + A typical partition directory would be similar to (for ex sym: MSFT_N) wdb/database/2015.11.26/456/trade/ In the above example, the data is parted by sym, and number 456 is the order of MSFT_N symbol entry in the HDB sym file. diff --git a/tests/wdb/intpartbyenum/database.q b/tests/wdb/intpartbyenum/database.q new file mode 100644 index 000000000..2ea5aeb44 --- /dev/null +++ b/tests/wdb/intpartbyenum/database.q @@ -0,0 +1,4 @@ +tshort:([]enumcol:`short$(); expint:`long$()) +tint: ([]enumcol:`int$(); expint:`long$()) +tlong: ([]enumcol:`long$(); expint:`long$()) +tsym: ([]enumcol:`symbol$(); expint:`long$()) diff --git a/tests/wdb/intpartbyenum/intpartbyenum.csv b/tests/wdb/intpartbyenum/intpartbyenum.csv new file mode 100644 index 000000000..891afcb7b --- /dev/null +++ b/tests/wdb/intpartbyenum/intpartbyenum.csv @@ -0,0 +1,18 @@ +action,ms,bytes,lang,code,repeat,minver,comment +comment,,,,,,,"Tests to check partbyenum can partition on all symbol and integer column types" +before,0,0,q,.servers.startup[],1,,"Start connection management" +before,0,0,q,system "sleep 5",1,,"Wait for connections" +before,0,0,q,hdbhandle:gethandle[`hdb1],1,,"Open handle to HDB" +before,0,0,q,wdbhandle:gethandle[`wdb1],1,,"Open handle to WDB" +before,0,0,q,idbhandle:gethandle[`idb1],1,,"Open handle to IDB" +before,0,0,q,wdbhandle ".wdb.immediate:1b",1,,"Set WDB to write down immediately" +run,0,0,q,wdbhandle(upsert';`tshort`tint`tlong`tsym;(testtshort;testtint;testtlong;testtsym)),1,,"Upsert test data to WDB" +run,0,0,q,wdbhandle(`.wdb.savetodisk;`),1,,"Write test data" +run,0,0,q,system "sleep 5",1,,"Wait for write to finish" +true,0,0,q,expints ~ asc "J"$ string key .Q.dd[wdbdir;.z.D],1,,"Check WDB directory structure" +true,0,0,q,"(asc `TORQNULLSYMBOL^testtsym[`enumcol]) ~ asc get symfile",1,,"Check all symbols are present in HDB sym file" +true,0,0,q,idbhandle "exec int~expint from raze ?[;();0b;`int`expint!`int`expint] each `tshort`tint`tlong`tsym",1,,"Check int columns are as expected in IDB" +true,0,0,q,idbhandle "exec int~enumcol from raze ?[;();0b;`int`enumcol!(`int;(maptoint;`enumcol))] each `tshort`tint`tlong`tsym",1,,"Check maptoint working as expected in IDB" +run,0,0,q,wdbhandle(`.u.end;`.wdb.currentpartition),1,,"Trigger EOD on WDB" +run,0,0,q,system "sleep 5",1,,"Wait for merge to HDB to happen" +true,0,0,q,(count each (testtshort;testtint;testtlong;testtsym)) ~ hdbhandle "{count get x} each `tshort`tint`tlong`tsym",1,,"Check table counts in HDB" diff --git a/tests/wdb/intpartbyenum/process.csv b/tests/wdb/intpartbyenum/process.csv new file mode 100644 index 000000000..015417e41 --- /dev/null +++ b/tests/wdb/intpartbyenum/process.csv @@ -0,0 +1,7 @@ +host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd +localhost,{KDBBASEPORT}+100,discovery,discovery1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/discovery.q,1,,q +localhost,{KDBBASEPORT}+101,gateway,gateway1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,,,${KDBCODE}/processes/gateway.q,1,,q +localhost,{KDBBASEPORT}+102,segmentedtickerplant,stp1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${KDBTESTS}/wdb/intpartbyenum/database.q,q +localhost,{KDBBASEPORT}+103,hdb,hdb1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,,,${KDBTESTS}/wdb/intpartbyenum/temphdb,1,,q +localhost,{KDBBASEPORT}+104,wdb,wdb1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,,,${KDBCODE}/processes/wdb.q,1,-.wdb.writedownmode partbyenum -.wdb.mode saveandsort -.wdb.sortcsv ${KDBTESTS}/wdb/intpartbyenum/sort.csv -.wdb.savedir ${KDBTESTS}/wdb/intpartbyenum/tempwdb -.wdb.hdbdir ${KDBTESTS}/wdb/intpartbyenum/temphdb,q +localhost,{KDBBASEPORT}+105,idb,idb1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,,,${KDBCODE}/processes/idb.q,1,,q diff --git a/tests/wdb/intpartbyenum/run.sh b/tests/wdb/intpartbyenum/run.sh new file mode 100755 index 000000000..892b87e02 --- /dev/null +++ b/tests/wdb/intpartbyenum/run.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# Handle command-line arguments +source $KDBTESTS/flagparse.sh + +# Path to test directory +testpath=${KDBTESTS}/wdb/intpartbyenum + +# Make temporary HDB directory +mkdir -p ${testpath}/temphdb + +# Start procs +${TORQHOME}/torq.sh start all -csv ${testpath}/process.csv + +# Start test proc +${RLWRAP} q ${TORQHOME}/torq.q \ + -proctype test -procname test1 \ + -test ${testpath} \ + -load ${KDBTESTS}/helperfunctions.q ${testpath}/settings.q \ + -testresults ${testpath}/results/ \ + -runtime $run \ + -procfile ${testpath}/process.csv \ + $debug $stop $write $quiet + +# Shut down procs +${TORQHOME}/torq.sh stop all -force -csv ${testpath}/process.csv + +# Remove temporary WDB and HDB directories +rm -r ${testpath}/tempwdb +rm -r ${testpath}/temphdb diff --git a/tests/wdb/intpartbyenum/settings.q b/tests/wdb/intpartbyenum/settings.q new file mode 100644 index 000000000..90ad03d83 --- /dev/null +++ b/tests/wdb/intpartbyenum/settings.q @@ -0,0 +1,17 @@ +// IPC connection parameters +.servers.CONNECTIONS:`wdb`hdb`idb; +.servers.USERPASS:`admin:admin; + +// Filepaths +wdbdir:hsym `$getenv[`KDBTESTS],"/wdb/intpartbyenum/tempwdb"; +hdbdir:hsym `$getenv[`KDBTESTS],"/wdb/intpartbyenum/temphdb"; +symfile:` sv hdbdir,`sym; + +// Test tables with expected int partitions +testtshort:([]enumcol:-0W -1 0 0N 1 0Wh; expint:0 0 0 0 1 32767); +testtint: ([]enumcol:-0W -1 0 0N 1 0Wi; expint:0 0 0 0 1 2147483647); +testtlong: ([]enumcol:-0W -1 0 0N 1 0W; expint:0 0 0 0 1 2147483647); +testtsym: update expint:i from ([]enumcol:`a`b`c`d`e`); + +// All expected int partitions +expints:asc distinct raze (testtshort;testtint;testtlong;testtsym)@\:`expint; diff --git a/tests/wdb/intpartbyenum/sort.csv b/tests/wdb/intpartbyenum/sort.csv new file mode 100644 index 000000000..d9d6dbcb7 --- /dev/null +++ b/tests/wdb/intpartbyenum/sort.csv @@ -0,0 +1,2 @@ +tabname,att,column,sort +default,p,enumcol,1