From 8a8259edcf4b817fd951eb66a14ed040ab3904cb Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Thu, 21 Nov 2024 17:33:25 +0000 Subject: [PATCH 1/8] #672 make sort process call initmissingtables and notifyidbs instead of wdb --- code/processes/wdb.q | 23 +++++++++++++++-------- config/settings/sort.q | 3 ++- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 60d5baf85..0f6edb3ba 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -206,12 +206,6 @@ endofday:{[pt;processdata] if[.finspace.enabled;.finspace.notifyhdb[;changeset] each .finspace.hdbclusters]; currentpartition::pt+1; /- in case of default/partbyenum writedown mode we want to initialise the new partition with all the table schemas - /- then notify idb processes of the new db - if[writedownmode in `partbyenum`default; - .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; - initmissingtables[]; - .lg.o[`eod;"notifying idbs for newly created partition"]; - notifyidbs[`.idb.rollover;currentpartition]]; .lg.o[`eod;"end of day is now complete"]; if[.finspace.enabled;.os.hdeldir[getenv[`KDBSCRATCH];0b]]; }; @@ -313,7 +307,10 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Moving partition from the temp wdb ",(dw:.os.pth -1 _ string .Q.par[dir;pt;`])," directory to the hdb directory ",hw:.os.pth -1 _ string .Q.par[hdbsettings[`hdbdir];pt;`]]; .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - + + + idbReload[pt]; + /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; @@ -482,7 +479,7 @@ informsortandreload:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;merge ]; [.lg.e[`informsortandreload;"can't connect to the sortandreload - no sortandreload process detected"]; // try to run the sort locally - endofdaysort[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]]]; + endofdaysort[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod;]]]; }; /-function to set the timer for the save to disk function @@ -594,6 +591,16 @@ getsortparams:{[] ]; }; +idbReload:{[pt] + .lg.o[`idb;"starting idb reload"]; + if[writedownmode in `partbyenum`default; + .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; + $[.proc.proctype~`sort;{ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[])}[];initmissingtables[]]; + .lg.o[`eod;"notifying idbs for newly created partition"]; + notifyidbs[`.idb.rollover;currentpartition]]; + .lg.o[`idb;"idb reload complete"]; + };; + \d . /- get the sort attributes for each table diff --git a/config/settings/sort.q b/config/settings/sort.q index 8f3233700..228b3bb8a 100644 --- a/config/settings/sort.q +++ b/config/settings/sort.q @@ -5,6 +5,7 @@ ignorelist:`heartbeat`logmsg // list of tables to ignore hdbtypes:`hdb // list of hdb types to look for and call in hdb reload rdbtypes:`rdb // list of rdb types to look for and call in rdb reload tickerplanttypes:`tickerplant // list of tickerplant types to try and make a connection to +wdbtypes:`wdb // list of wdb types to look for and call in wdb init tables subtabs:` // list of tables to subscribe for (` for all) subsyms:` // list of syms to subscribe for (` for all) savedir:hsym`$getenv[`TORQHOME],"/wdbhdb" // location to save wdb data @@ -46,5 +47,5 @@ eodwaittime:0D00:00:10.000 // time to wait for async calls to compl // Server connection details \d .servers -CONNECTIONS:`hdb`tickerplant`rdb`gateway // list of connections to make at start up +CONNECTIONS:`wdb`hdb`tickerplant`rdb`gateway // list of connections to make at start up STARTUP:1b // create connections From a6dbd6d71bc3495542adebe4a8a3380758f04099 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Fri, 22 Nov 2024 11:06:53 +0000 Subject: [PATCH 2/8] change initmissingtables to work with new set up --- code/processes/wdb.q | 19 ++++++++++--------- code/wdb/origstartup.q | 4 ++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 0f6edb3ba..9cfcd3d82 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -524,20 +524,21 @@ fixpartition:{[subto] /- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables. /- In that case we can use .Q.chk later to fill the db making it useable for intraday processes -initmissingtables:{[] - .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string currentpartition]; - inittable each tablelist[]; - filldb[]; +initmissingtables:{[currentPt] + $[currentPt;pt:currentpartition;pt:currentpartition+1]; + .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt]; + inittable[;pt] each tablelist[]; + filldb[pt]; } -filldb:{[] +filldb:{[pt] /- for all enumerated partitions we want to make sure that all tables are present - .Q.chk[.Q.par[hsym savedir; currentpartition; `]]; + .Q.chk[.Q.par[hsym savedir; pt; `]]; } /- initialises table t in db with its schema in part -inittable:{[t] - tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;currentpartition];0;t]; .Q.par[hsym savedir;currentpartition;t]],`; +inittable:{[t;pt] + tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`; if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]]; } @@ -595,7 +596,7 @@ idbReload:{[pt] .lg.o[`idb;"starting idb reload"]; if[writedownmode in `partbyenum`default; .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; - $[.proc.proctype~`sort;{ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[])}[];initmissingtables[]]; + $[.proc.proctype~`sort;{ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[1b])}[];initmissingtables[0b]]; .lg.o[`eod;"notifying idbs for newly created partition"]; notifyidbs[`.idb.rollover;currentpartition]]; .lg.o[`idb;"idb reload complete"]; diff --git a/code/wdb/origstartup.q b/code/wdb/origstartup.q index db884fa29..797a85008 100644 --- a/code/wdb/origstartup.q +++ b/code/wdb/origstartup.q @@ -15,7 +15,7 @@ startup:{[] ]; subscribe[]; /- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode - if[.wdb.writedownmode in `default`partbyenum;initmissingtables[]]; + if[.wdb.writedownmode in `default`partbyenum;initmissingtables[1b]]; ]; @[`.; `upd; :; .wdb.upd]; - } \ No newline at end of file + } From 95a1905ddcd2351e54245e55d39aacc3e7c10827 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Mon, 25 Nov 2024 13:38:47 +0000 Subject: [PATCH 3/8] #672 clean up cpde and add some comments --- code/processes/wdb.q | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 9cfcd3d82..0d13fe802 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -308,9 +308,8 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - - idbReload[pt]; - + idbreload[pt]; + /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; @@ -479,7 +478,7 @@ informsortandreload:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;merge ]; [.lg.e[`informsortandreload;"can't connect to the sortandreload - no sortandreload process detected"]; // try to run the sort locally - endofdaysort[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod;]]]; + endofdaysort[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]]]; }; /-function to set the timer for the save to disk function @@ -524,8 +523,9 @@ fixpartition:{[subto] /- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables. /- In that case we can use .Q.chk later to fill the db making it useable for intraday processes -initmissingtables:{[currentPt] - $[currentPt;pt:currentpartition;pt:currentpartition+1]; +/- currentpt - boolean; if true initialises partition for value of currentpartition; if false initialises partition for next value of currentpartition +initmissingtables:{[currentpt] + $[currentpt;pt:currentpartition;pt:currentpartition+1]; .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt]; inittable[;pt] each tablelist[]; filldb[pt]; @@ -592,7 +592,8 @@ getsortparams:{[] ]; }; -idbReload:{[pt] +/- funtion to initialise partition at EOD and notifyidbs to rollover, if .wdb.mode is not a sort mode, initmissingtables done on sort proc +idbreload:{[pt] .lg.o[`idb;"starting idb reload"]; if[writedownmode in `partbyenum`default; .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; From 5174a85155e59125a3af3d4b0e81e10b7d8af030 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Tue, 26 Nov 2024 14:18:32 +0000 Subject: [PATCH 4/8] #672 move idbreload to endofdaysort --- code/processes/wdb.q | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 0d13fe802..52bc0e60a 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -308,8 +308,6 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - idbreload[pt]; - /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; @@ -420,6 +418,8 @@ endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod] endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]; endofdaysortdate[dir;pt;key tablist;hdbsettings] ]; + /- run steps to rollover idb + idbreload[pt+1]; /- reset compression level (.z.zd) resetcompression[16 0 0] }; @@ -596,10 +596,10 @@ getsortparams:{[] idbreload:{[pt] .lg.o[`idb;"starting idb reload"]; if[writedownmode in `partbyenum`default; - .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; + .lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]]; $[.proc.proctype~`sort;{ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[1b])}[];initmissingtables[0b]]; .lg.o[`eod;"notifying idbs for newly created partition"]; - notifyidbs[`.idb.rollover;currentpartition]]; + notifyidbs[`.idb.rollover;pt]]; .lg.o[`idb;"idb reload complete"]; };; From c93dd09e175091c46b0d13df3150c3551ba19318 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Thu, 28 Nov 2024 09:38:55 +0000 Subject: [PATCH 5/8] #672 change the way initmissingtables works --- code/processes/wdb.q | 11 +++++------ code/wdb/origstartup.q | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 52bc0e60a..a938b63ce 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -307,7 +307,7 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Moving partition from the temp wdb ",(dw:.os.pth -1 _ string .Q.par[dir;pt;`])," directory to the hdb directory ",hw:.os.pth -1 _ string .Q.par[hdbsettings[`hdbdir];pt;`]]; .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - + /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; @@ -419,7 +419,7 @@ endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod] endofdaysortdate[dir;pt;key tablist;hdbsettings] ]; /- run steps to rollover idb - idbreload[pt+1]; + idbreload[currentpartition+1]; /- reset compression level (.z.zd) resetcompression[16 0 0] }; @@ -523,9 +523,8 @@ fixpartition:{[subto] /- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables. /- In that case we can use .Q.chk later to fill the db making it useable for intraday processes -/- currentpt - boolean; if true initialises partition for value of currentpartition; if false initialises partition for next value of currentpartition -initmissingtables:{[currentpt] - $[currentpt;pt:currentpartition;pt:currentpartition+1]; +/- pt - date; partition for which the function should initialise +initmissingtables:{[pt] .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt]; inittable[;pt] each tablelist[]; filldb[pt]; @@ -597,7 +596,7 @@ idbreload:{[pt] .lg.o[`idb;"starting idb reload"]; if[writedownmode in `partbyenum`default; .lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]]; - $[.proc.proctype~`sort;{ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[1b])}[];initmissingtables[0b]]; + $[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[pt])}[pt];initmissingtables[pt]]; .lg.o[`eod;"notifying idbs for newly created partition"]; notifyidbs[`.idb.rollover;pt]]; .lg.o[`idb;"idb reload complete"]; diff --git a/code/wdb/origstartup.q b/code/wdb/origstartup.q index 797a85008..dd91c0e61 100644 --- a/code/wdb/origstartup.q +++ b/code/wdb/origstartup.q @@ -15,7 +15,7 @@ startup:{[] ]; subscribe[]; /- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode - if[.wdb.writedownmode in `default`partbyenum;initmissingtables[1b]]; + if[.wdb.writedownmode in `default`partbyenum;initmissingtables[currentpartition]]; ]; @[`.; `upd; :; .wdb.upd]; } From 49a156d74e3bc0e4133985e6c96b7eec39d80ba7 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Thu, 28 Nov 2024 12:34:51 +0000 Subject: [PATCH 6/8] #672 make use of wdbtypes for sort procs to connect to --- code/processes/wdb.q | 9 ++++++--- config/settings/wdb.q | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index a938b63ce..234bf75a6 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -54,6 +54,8 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process +wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables + subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for subsyms:@[value;`subsyms;`]; /-list of syms to subscription to upd:@[value;`upd;{insert}]; /-value of the upd function @@ -75,7 +77,7 @@ eodwaittime:@[value;`eodwaittime;0D00:00:10.000]; /-len .save.savedownmanipulation:@[value;`.save.savedownmanipulation;()!()]; /-a dict of table!function used to manipulate tables at EOD save .save.postreplay:@[value;`.save.postreplay;{{[d;p] }}]; /-post EOD function, invoked after all the tables have been written down -/ - end of default parameters + - end of default parameters / - define .z.pd in order to connect to any worker processes .dotz.set[`.z.pd;{$[.z.K<3.3; @@ -307,7 +309,7 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Moving partition from the temp wdb ",(dw:.os.pth -1 _ string .Q.par[dir;pt;`])," directory to the hdb directory ",hw:.os.pth -1 _ string .Q.par[hdbsettings[`hdbdir];pt;`]]; .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - + /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; @@ -598,7 +600,8 @@ idbreload:{[pt] .lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]]; $[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[pt])}[pt];initmissingtables[pt]]; .lg.o[`eod;"notifying idbs for newly created partition"]; - notifyidbs[`.idb.rollover;pt]]; + notifyidbs[`.idb.rollover;pt] + ]; .lg.o[`idb;"idb reload complete"]; };; diff --git a/config/settings/wdb.q b/config/settings/wdb.q index 16250689d..95c8047e3 100644 --- a/config/settings/wdb.q +++ b/config/settings/wdb.q @@ -6,6 +6,7 @@ ignorelist:`heartbeat`logmsg hdbtypes:`hdb // list of hdb types to look for and call in hdb reload rdbtypes:`rdb // list of rdb types to look for and call in rdb reload idbtypes:`idb // list of idb types to look for and call in rdb reload +wdbtypes:() // wdb does not need to connect to itself gatewaytypes:`gateway // list of gateway types to inform at reload tickerplanttypes:`segmentedtickerplant // list of tickerplant types to try and make a connection to subtabs:` // list of tables to subscribe for (` for all) From 0fe0541c53245bd554e14e03a8132946717c7d77 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Thu, 28 Nov 2024 12:36:27 +0000 Subject: [PATCH 7/8] #672 remove unnecessary whitespace --- code/processes/wdb.q | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 234bf75a6..67caeb1c0 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -309,7 +309,7 @@ endofdaysortdate:{[dir;pt;tablist;hdbsettings] .lg.o[`mvtohdb;"Moving partition from the temp wdb ",(dw:.os.pth -1 _ string .Q.par[dir;pt;`])," directory to the hdb directory ",hw:.os.pth -1 _ string .Q.par[hdbsettings[`hdbdir];pt;`]]; .lg.o[`mvtohdb;"Attempting to move ",(", "sv string key hsym`$dw)," from ",dw," to ",hw]; .[movetohdb;(dw;hw;pt);{.lg.e[`mvtohdb;"Function movetohdb failed with error: ",x]}]; - + /-call the posteod function .save.postreplay[hdbsettings[`hdbdir];pt]; if[permitreload; From 5a157464ef044bbe9d76dd8465e54e4f68d6d2c0 Mon Sep 17 00:00:00 2001 From: "tgillespie@homer.aquaq.co.uk" Date: Thu, 28 Nov 2024 15:18:02 +0000 Subject: [PATCH 8/8] #672 make the initmissingtables command get sent to all wdbs --- code/processes/wdb.q | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 67caeb1c0..5cfb69c3c 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -77,7 +77,7 @@ eodwaittime:@[value;`eodwaittime;0D00:00:10.000]; /-len .save.savedownmanipulation:@[value;`.save.savedownmanipulation;()!()]; /-a dict of table!function used to manipulate tables at EOD save .save.postreplay:@[value;`.save.postreplay;{{[d;p] }}]; /-post EOD function, invoked after all the tables have been written down - - end of default parameters +/ - end of default parameters / - define .z.pd in order to connect to any worker processes .dotz.set[`.z.pd;{$[.z.K<3.3; @@ -593,12 +593,13 @@ getsortparams:{[] ]; }; -/- funtion to initialise partition at EOD and notifyidbs to rollover, if .wdb.mode is not a sort mode, initmissingtables done on sort proc +/- Function to trigger idb reload steps, The initmissingtables function needs to be ran on wdb process, however this function can be ran on the sort processes +/- If the function is ran on sort process send initmissingtables command to wdbs idbreload:{[pt] .lg.o[`idb;"starting idb reload"]; if[writedownmode in `partbyenum`default; .lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]]; - $[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;`wdb;()!();1b;0b];[first ws](`.wdb.initmissingtables;[pt])}[pt];initmissingtables[pt]]; + $[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]]; .lg.o[`eod;"notifying idbs for newly created partition"]; notifyidbs[`.idb.rollover;pt] ];