Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions code/processes/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num
sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort
sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process

wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables

subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for
subsyms:@[value;`subsyms;`]; /-list of syms to subscription to
upd:@[value;`upd;{insert}]; /-value of the upd function
Expand Down Expand Up @@ -206,12 +208,6 @@ endofday:{[pt;processdata]
if[.finspace.enabled;.finspace.notifyhdb[;changeset] each .finspace.hdbclusters];
currentpartition::pt+1;
/- in case of default/partbyenum writedown mode we want to initialise the new partition with all the table schemas
/- then notify idb processes of the new db
if[writedownmode in `partbyenum`default;
.lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]];
initmissingtables[];
.lg.o[`eod;"notifying idbs for newly created partition"];
notifyidbs[`.idb.rollover;currentpartition]];
.lg.o[`eod;"end of day is now complete"];
if[.finspace.enabled;.os.hdeldir[getenv[`KDBSCRATCH];0b]];
};
Expand Down Expand Up @@ -424,6 +420,8 @@ endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
endofdaysortdate[dir;pt;key tablist;hdbsettings]
];
/- run steps to rollover idb
idbreload[currentpartition+1];
/- reset compression level (.z.zd)
resetcompression[16 0 0]
};
Expand Down Expand Up @@ -527,20 +525,21 @@ fixpartition:{[subto]

/- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables.
/- In that case we can use .Q.chk later to fill the db making it useable for intraday processes
initmissingtables:{[]
.lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string currentpartition];
inittable each tablelist[];
filldb[];
/- pt - date; partition for which the function should initialise
initmissingtables:{[pt]
.lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt];
inittable[;pt] each tablelist[];
filldb[pt];
}

filldb:{[]
filldb:{[pt]
/- for all enumerated partitions we want to make sure that all tables are present
.Q.chk[.Q.par[hsym savedir; currentpartition; `]];
.Q.chk[.Q.par[hsym savedir; pt; `]];
}

/- initialises table t in db with its schema in part
inittable:{[t]
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;currentpartition];0;t]; .Q.par[hsym savedir;currentpartition;t]],`;
inittable:{[t;pt]
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]];
}

Expand Down Expand Up @@ -594,6 +593,19 @@ getsortparams:{[]
];
};

/- Function to trigger idb reload steps, The initmissingtables function needs to be ran on wdb process, however this function can be ran on the sort processes
/- If the function is ran on sort process send initmissingtables command to wdbs
idbreload:{[pt]
.lg.o[`idb;"starting idb reload"];
if[writedownmode in `partbyenum`default;
.lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]];
$[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]];
.lg.o[`eod;"notifying idbs for newly created partition"];
notifyidbs[`.idb.rollover;pt]
];
.lg.o[`idb;"idb reload complete"];
};;

\d .

/- get the sort attributes for each table
Expand Down
4 changes: 2 additions & 2 deletions code/wdb/origstartup.q
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ startup:{[]
];
subscribe[];
/- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode
if[.wdb.writedownmode in `default`partbyenum;initmissingtables[]];
if[.wdb.writedownmode in `default`partbyenum;initmissingtables[currentpartition]];
];
@[`.; `upd; :; .wdb.upd];
}
}
3 changes: 2 additions & 1 deletion config/settings/sort.q
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ignorelist:`heartbeat`logmsg // list of tables to ignore
hdbtypes:`hdb // list of hdb types to look for and call in hdb reload
rdbtypes:`rdb // list of rdb types to look for and call in rdb reload
tickerplanttypes:`tickerplant // list of tickerplant types to try and make a connection to
wdbtypes:`wdb // list of wdb types to look for and call in wdb init tables
subtabs:` // list of tables to subscribe for (` for all)
subsyms:` // list of syms to subscribe for (` for all)
savedir:hsym`$getenv[`TORQHOME],"/wdbhdb" // location to save wdb data
Expand Down Expand Up @@ -46,5 +47,5 @@ eodwaittime:0D00:00:10.000 // time to wait for async calls to compl

// Server connection details
\d .servers
CONNECTIONS:`hdb`tickerplant`rdb`gateway // list of connections to make at start up
CONNECTIONS:`wdb`hdb`tickerplant`rdb`gateway // list of connections to make at start up
STARTUP:1b // create connections
1 change: 1 addition & 0 deletions config/settings/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ignorelist:`heartbeat`logmsg
hdbtypes:`hdb // list of hdb types to look for and call in hdb reload
rdbtypes:`rdb // list of rdb types to look for and call in rdb reload
idbtypes:`idb // list of idb types to look for and call in rdb reload
wdbtypes:() // wdb does not need to connect to itself
gatewaytypes:`gateway // list of gateway types to inform at reload
tickerplanttypes:`segmentedtickerplant // list of tickerplant types to try and make a connection to
subtabs:` // list of tables to subscribe for (` for all)
Expand Down