Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions code/common/merge.q
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ checkpartitiontype:{[tablename;extrapartitiontype]
.lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]];
};

/- function to check if the extra partition column has a symbol type
checksymboltype:{[tablename;extrapartitiontype]
$[all extrapartitiontype in exec c from meta[tablename] where t="s";
.lg.o[`checksymbol;"all columns do have a symbol type in ",(string tablename)," table"];
.lg.e[`checksymbol;"not all columns ",string[extrapartitiontype]," do have a symbol type in ",(string tablename)," table"]];
/- function to check if the extra partition column has an enumerable type
checkenumerabletype:{[tablename;extrapartitiontype]
$[all extrapartitiontype in exec c from meta[tablename] where t in "hijs";
.lg.o[`checkenumerable;"all columns do have an enumerable type in ",(string tablename)," table"];
.lg.e[`checkenumerable;"not all columns ",string[extrapartitiontype]," do have an enumerable type in ",(string tablename)," table"]];
};


/- function to get list of distinct combiniations for partition directories
/- function to get list of distinct combinations for partition directories
/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ]
getextrapartitions:{[tablename;extrapartitiontype]
value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype]
Expand Down
8 changes: 6 additions & 2 deletions code/processes/idb.q
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ reload:.idb.intradayreload;
.idb.init[];

/- helper function to support queries against the sym column
maptoint:{[symbol]
sym?`TORQNULLSYMBOL^symbol
maptoint:{[val]
$[(abs type val) in 5 6 7h;
/- if using an integer column, clamp value between 0 and max int (null maps to 0)
0| 2147483647& `long$ val;
/- if using a symbol column, enumerate against the hdb sym file
sym?`TORQNULLSYMBOL^val]
};
2 changes: 1 addition & 1 deletion code/processes/tickerlogreplay.q
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tempdir:@[value;`tempdir;`:tempmergedir]; // locat
mergenumrows:@[value;`mergenumrows;10000000]; // default number of rows for merge process
mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; // specify number of rows per table for merge process
mergenumbytes:@[value;`mergenumbytes;500000000]; // default number of bytes for merge process
mergemethod:@[value;`mergemethod;`part]; // the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways:
mergemethod:@[value;`mergemethod;`part]; // the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways:
// 1. part - the entire partition is merged to the hdb
// 2. col - each column in the temporary partitions are merged individually
// 3. hybrid - partitions merged by column or entire partittion based on byte limit
Expand Down
43 changes: 25 additions & 18 deletions code/processes/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ writedownmode:@[value;`writedownmode;`default]; /-the
/- 1. default - the data is partitioned by [ partitiontype ]
/- at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb
/- 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s) assigned the parted attributed in sort.csv
/- at EOD the data will be merged from each partiton before being moved to hdb
/- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol column with parted attribution assigned in sort.csv
/- at EOD the data will be merged from each partiton before being moved to hdb
enumcol:@[value;`enumcol;`sym]; /-symbol column to enumerate by. Only used with writedownmode: partbyenum.
/- at EOD the data will be merged from each partition before being moved to hdb
/- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv
/- at EOD the data will be merged from each partition before being moved to hdb

mergemode:@[value;`mergemode;`part]; /-the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways:
mergemode:@[value;`mergemode;`part]; /-the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways:
/- 1. part - the entire partition is merged to the hdb
/- 2. col - each column in the temporary partitions are merged individually
/- 3. hybrid - partitions merged by column or entire partittion based on byte limit
Expand Down Expand Up @@ -117,12 +116,20 @@ tablelist:{[] sortedlist:exec tablename from `bytes xdesc tabsizes;
/- function that ensures a list of syms is returned no matter what is passed to it
ensuresymlist:{[s] -1 _ `${@[x; where not ((type each x) in (10 -10h));string]} s,(::)}

/- vectorized function to map partition value(s) to int partition(s) in partbyenum mode
maptoint:{[val]
$[(abs type val) in 5 6 7h;
/- if using an integer column, clamp value between 0 and max int (null maps to 0)
0| 2147483647& `long$ val;
/- if using a symbol column, enumerate against the hdb sym file
`long$ (` sv hdbsettings[`hdbdir],`sym)?`TORQNULLSYMBOL^val]
};

/- function to upsert to specified directory
upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt;writedownmode]
/- enumerate current extra partition against the hdb sym file
/- if extra partition is null, send to a partition enumerated against `TORQNULLSYMBOL symbol
if[writedownmode~`partbyenum;i:`long$(` sv hdbsettings[`hdbdir],`sym)? first[`TORQNULLSYMBOL^ ensuresymlist[expt]]];
/- create directory location for selected partiton
/- enumerate first extra partition value
if[writedownmode~`partbyenum;i:maptoint first expt];
/- create directory location for selected partition
/- replace non-alphanumeric characters in symbols with _
/- convert to symbols and replace any null values with `TORQNULLSYMBOL
directory:$[writedownmode~`partbyenum;
Expand Down Expand Up @@ -150,9 +157,9 @@ savetablesbypart:{[dir;pt;forcesave;tablename;writedownmode]
];
/- check each partition type actually is a column in the selected table
.merge.checkpartitiontype[tablename;extrapartitiontype];
/- check if provided symbol column extrapartitiontype indeed has a symbol type in table
if[writedownmode~`partbyenum;.merge.checksymboltype[tablename;extrapartitiontype]];
/- get list of distinct combiniations for partition directories
/- check if provided column extrapartitiontype indeed has an enumerable type in table
if[writedownmode~`partbyenum;.merge.checkenumerabletype[tablename;extrapartitiontype]];
/- get list of distinct combinations for partition directories
extrapartitions:.merge.getextrapartitions[tablename;extrapartitiontype];
/- enumerate data to be upserted
enumdata:.Q.en[hdbsettings[`hdbdir];0!.save.manipulate[tablename;`. tablename]];
Expand Down Expand Up @@ -191,7 +198,7 @@ endofday:{[pt;processdata]
/- set what type of merge method to be used
mergemethod:mergemode;
/- create a dictionary of tables and merge limits, byte or row count limit depending on settings
.lg.o[`merge;"merging partitons by ",$[.merge.mergebybytelimit;"byte estimate";"row count"]," limit"];
.lg.o[`merge;"merging partitions by ",$[.merge.mergebybytelimit;"byte estimate";"row count"]," limit"];
mergelimits:(tablelist[],())!($[.merge.mergebybytelimit;{(count x)#mergenumbytes};{[x] mergenumrows^mergemaxrows[x]}]tablelist[]),();
tablist:tablelist[]!{0#value x} each tablelist[];
/ Need to download sym file to scratch directory if this is Finspace application
Expand Down Expand Up @@ -273,7 +280,7 @@ resetcompression:{setcompression 16 0 0 }

//check if the hdb directory contains current partition
//if yes check if patition is empty and if it is not see if any of the tables exist in both the
//temporary parition and the hdb partition. If there is a clash abort operation otherwise copy
//temporary partition and the hdb partition. If there is a clash abort operation otherwise copy
//each table to the hdb partition
movetohdb:{[dw;hw;pt]
$[not(`$string pt)in key hsym`$-10 _ hw;
Expand Down Expand Up @@ -376,7 +383,7 @@ removetablefromenumdir:{[partdir]
};

endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]
/- merge data from partitons
/- merge data from partitions
/- .z.pd funciton in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
tempfix2:$[.finspace.enabled;0b;(0 < count .z.pd[])];
$[tempfix2 and ((system "s")<0);
Expand Down Expand Up @@ -513,14 +520,14 @@ subscribe:{[]
fixpartition:{[subto]
/- check if the tp logdate matches current date
if[not (tplogdate:subto[`tplogdate])~orig:currentpartition;
.lg.o[`fixpartition;"Current partiton date does not match the ticker plant log date"];
/- set the current partiton date to the log date
.lg.o[`fixpartition;"Current partition date does not match the ticker plant log date"];
/- set the current partition date to the log date
currentpartition::tplogdate;
/- move the data that has been written to correct partition
pth1:.os.pth[-1 _ string .Q.par[savedir;orig;`]];
pth2:.os.pth[-1 _ string .Q.par[savedir;tplogdate;`]];
if[not ()~key hsym `$.os.pthq pth1;
/- delete any data in the current partiton directory
/- delete any data in the current partition directory
clearwdbdata[];
.lg.o[`fixpartition;"Moving data from partition ",(.os.pthq pth1) ," to partition ",.os.pthq pth2];
.[.os.ren;(pth1;pth2);{.lg.e[`fixpartition;"Failed to move data from wdb partition ",x," to wdb partition ",y," : ",z]}[pth1;pth2]]];
Expand Down
2 changes: 1 addition & 1 deletion code/wdb/origstartup.q
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ startup:{[]
$[writedownmode~`partbyattr;
.lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[tablename]/[parted column(s)]/"];
writedownmode~`partbyenum;
.lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[parted symbol column enumerated]/[tablename]/"];
.lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[parted column enumerated]/[tablename]/"];
.lg.o[`init; "partition has been set to [savedir]/[",(string partitiontype),"]/[tablename]/"]];
if[saveenabled;
//check if tickerplant is available and if not exit with error
Expand Down
2 changes: 1 addition & 1 deletion code/wdb/writedown.q
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ getpartition:@[value;`getpartition; /-fun
{{@[value;`.wdb.currentpartition;
(`date^partitiontype)$.proc.cd[]]}}];

currentpartition:.wdb.getpartition[]; /- Initialise current partiton
currentpartition:.wdb.getpartition[]; /- Initialise current partition

tabsizes:([tablename:`symbol$()] rowcount:`long$(); bytes:`long$()); /- keyed table to track the size of tables on disk

Expand Down
11 changes: 5 additions & 6 deletions config/settings/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ mode:`save
// data on disk, apply attributes and the trigger a reload on the
// rdb and hdb processes
writedownmode:`default // the wdb process can periodically write data to disc and sort at EOD in two ways:
// 1. default - the data is partitioned by [ partitontype ]
// 1. default - the data is partitioned by [ partitiontype ]
// at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb
// 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s)assigned the parted attributed in sort.csv
// at EOD the data will be merged from each partiton before being moved to hdb
// 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol column with parted attrobution assigned in sort.csv
// at EOD the data will be merged from each partiton before being moved to hdb
enumcol:`sym; // default column for partitioning. Only used with writedownmode: partbyenum.
// at EOD the data will be merged from each partition before being moved to hdb
// 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv
// at EOD the data will be merged from each partition before being moved to hdb

mergemode:`part // the partbyattr writdown mode can merge data from tenmporary storage to the hdb in three ways:
mergemode:`part // the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways:
// 1. part - the entire partition is merged to the hdb
// 2. col - each column in the temporary partitions are merged individually
// 3. hybrid - partitions merged by column or entire partittion based on byte limit
Expand Down
15 changes: 9 additions & 6 deletions docs/Processes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1022,12 +1022,15 @@ sorting at the end of the day.
- partbyenum - Data is persisted to a partition scheme where the partition
is derived from parameters in the sort.csv file. In this mode partition
only can be done by one column which has parted attribute applied on it
and it also has to be of a symbol type. The partitioning on disk will
be the enumerated symbol entries of the parted symbol column. The
enumeration is done against the HDB sym file.
The general partition scheme is of the form
\[wdbdir\]/\[partitiontype\]/\[parted enumerated symbol column\]/\[table(s)\]/.
A typical partition directory would be similar to(for ex sym: MSFT_N)
and it also has to be of a symbol or integer (short, int, long) type.
If the column is a symbol type, the partitioning on disk will
be the symbol entries enumerated against the HDB sym file.
If the column is an integer type, the partitioning on disk will
be the raw integer values clamped between 0 and 2,147,483,647
(the maximum int value), with negative and null values
mapped to 0. The general partition scheme is of the form
\[wdbdir\]/\[partitiontype\]/\[parted enumerated column\]/\[table(s)\]/.
A typical partition directory would be similar to (for ex sym: MSFT_N)
wdb/database/2015.11.26/456/trade/
In the above example, the data is parted by sym, and number 456 is
the order of MSFT_N symbol entry in the HDB sym file.
Expand Down
4 changes: 4 additions & 0 deletions tests/wdb/intpartbyenum/database.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
tshort:([]enumcol:`short$(); expint:`long$())
tint: ([]enumcol:`int$(); expint:`long$())
tlong: ([]enumcol:`long$(); expint:`long$())
tsym: ([]enumcol:`symbol$(); expint:`long$())
18 changes: 18 additions & 0 deletions tests/wdb/intpartbyenum/intpartbyenum.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
action,ms,bytes,lang,code,repeat,minver,comment
comment,,,,,,,"Tests to check partbyenum can partition on all symbol and integer column types"
before,0,0,q,.servers.startup[],1,,"Start connection management"
before,0,0,q,system "sleep 5",1,,"Wait for connections"
before,0,0,q,hdbhandle:gethandle[`hdb1],1,,"Open handle to HDB"
before,0,0,q,wdbhandle:gethandle[`wdb1],1,,"Open handle to WDB"
before,0,0,q,idbhandle:gethandle[`idb1],1,,"Open handle to IDB"
before,0,0,q,wdbhandle ".wdb.immediate:1b",1,,"Set WDB to write down immediately"
run,0,0,q,wdbhandle(upsert';`tshort`tint`tlong`tsym;(testtshort;testtint;testtlong;testtsym)),1,,"Upsert test data to WDB"
run,0,0,q,wdbhandle(`.wdb.savetodisk;`),1,,"Write test data"
run,0,0,q,system "sleep 5",1,,"Wait for write to finish"
true,0,0,q,expints ~ asc "J"$ string key .Q.dd[wdbdir;.z.D],1,,"Check WDB directory structure"
true,0,0,q,"(asc `TORQNULLSYMBOL^testtsym[`enumcol]) ~ asc get symfile",1,,"Check all symbols are present in HDB sym file"
true,0,0,q,idbhandle "exec int~expint from raze ?[;();0b;`int`expint!`int`expint] each `tshort`tint`tlong`tsym",1,,"Check int columns are as expected in IDB"
true,0,0,q,idbhandle "exec int~enumcol from raze ?[;();0b;`int`enumcol!(`int;(maptoint;`enumcol))] each `tshort`tint`tlong`tsym",1,,"Check maptoint working as expected in IDB"
run,0,0,q,wdbhandle(`.u.end;`.wdb.currentpartition),1,,"Trigger EOD on WDB"
run,0,0,q,system "sleep 5",1,,"Wait for merge to HDB to happen"
true,0,0,q,(count each (testtshort;testtint;testtlong;testtsym)) ~ hdbhandle "{count get x} each `tshort`tint`tlong`tsym",1,,"Check table counts in HDB"
Loading