From 3aec6dd29b591cb0d156bf495f20ca2db9ec6fef Mon Sep 17 00:00:00 2001 From: zanbuchanan <141417075+zanbuchanan@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:59:12 +0000 Subject: [PATCH] adding replaymaxrows for optional row count on recovery --- code/processes/wdb.q | 10 +++++++--- code/wdb/origstartup.q | 5 +++++ code/wdb/writedown.q | 1 + config/settings/wdb.q | 2 ++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/code/processes/wdb.q b/code/processes/wdb.q index fde7119bf..c79cbac21 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -548,11 +548,15 @@ replayupd:{[f;t;d] /- execute the supplied function f . (t;d); /- if the data count is greater than the threshold, then flush data to disk - if[(rpc:count[value t]) > lmt:maxrows[t]; - .lg.o[`replayupd;"row limit (",string[lmt],") exceeded for ",string[t],". Table count is : ",string[rpc],". Flushing table to disk..."]; - savetables[savedir;getpartition[];0b;t]] + replaymaxrowcheck[t;replaymaxrows[t]]; }[upd]; +replaymaxrowcheck:{[t;lmt] + if[(rpc:count[value t]) > lmt; + .lg.o[`replayupd;"row limit (",string[lmt],") exceeded for ",string[t],". Table count is : ",string[rpc],". Flushing table to disk..."]; + savetables[savedir;getpartition[];0b;t]]; + }; + / - if there is data in the wdb directory for the partition remove it before replay / - is only for wdb processes that are saving data to disk clearwdbdata:{[] diff --git a/code/wdb/origstartup.q b/code/wdb/origstartup.q index dd91c0e61..ee6b3a188 100644 --- a/code/wdb/origstartup.q +++ b/code/wdb/origstartup.q @@ -16,6 +16,11 @@ startup:{[] subscribe[]; /- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode if[.wdb.writedownmode in `default`partbyenum;initmissingtables[currentpartition]]; + // if for replay table maxrows were customised, we want to check row count for each table, save and gc where needed + if[(not .wdb.numtab~.wdb.replaynumtab)or .wdb.numrows<>.wdb.replaynumrows; + tabs:exec table from .sub.SUBSCRIPTIONS; + tabmaxrowpairs:{(x;.wdb.maxrows[x])}each tabs; + {replaymaxrowcheck[first x;last x]}each tabmaxrowpairs]; ]; @[`.; `upd; :; .wdb.upd]; } diff --git a/code/wdb/writedown.q b/code/wdb/writedown.q index d0ee2566d..e357b6606 100644 --- a/code/wdb/writedown.q +++ b/code/wdb/writedown.q @@ -11,6 +11,7 @@ numtab:@[value;`numtab;`quote`trade!10000 50000]; /-spe maxrows:{[tabname] numrows^numtab[tabname]}; /- extract user defined row counts +replaymaxrows:{[tabname] replaynumrows^replaynumtab[tabname]}; partitiontype:@[value;`partitiontype;`date]; /-set type of partition (defaults to `date) diff --git a/config/settings/wdb.q b/config/settings/wdb.q index 95c8047e3..8a9b2a122 100644 --- a/config/settings/wdb.q +++ b/config/settings/wdb.q @@ -14,6 +14,8 @@ subsyms:` savedir:hsym`$getenv[`TORQHOME],"/wdbhdb" // location to save wdb data numrows:100000 // default number of rows numtab:`quote`trade!10000 50000 // specify number of rows per table +replaynumrows:numrows // 0W for replaying all messages at once then flushing +replaynumtab:numtab // enlist[`]!enlist 0W for replaying all messages at once then flushing mode:`save // the wdb process can operate in three modes // 1. saveandsort: the process will subscribe for data, // periodically write data to disk and at EOD it will flush