From 2d093ef8c261a4d6319bd4d8109f64c5dfddc609 Mon Sep 17 00:00:00 2001 From: Peter Murphy Date: Tue, 24 Dec 2024 11:17:17 +0000 Subject: [PATCH 1/2] Add subscriber info to CTP --- code/common/subscriptions.q | 4 ++-- code/processes/chainedtp.q | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/code/common/subscriptions.q b/code/common/subscriptions.q index b7ac91acb..91d3834b2 100644 --- a/code/common/subscriptions.q +++ b/code/common/subscriptions.q @@ -99,7 +99,7 @@ subscribe:{[tabs;instrs;setschema;replaylog;proc] $[tptype=`standard; [tablesfunc:{key `.u.w}; subfunc:{`schemalist`logfilelist`rowcounts`date!(.u.sub\:[x;y];enlist(.u`i`L);(.u `icounts);(.u `d))}]; - tptype=`segmented; + tptype in `chained`segmented; [tablesfunc:`tablelist; subfunc:`subdetails]; [.lg.e[`subscribe;e:"unrecognised tickerplant type: ",string tptype]; 'e]]; @@ -125,7 +125,7 @@ subscribe:{[tabs;instrs;setschema;replaylog;proc] // the date from the name of the tickerplant log file (assuming the tp log has a name like `: sym2014.01.01 // plus .u.i and .u.icounts if existing on TP - details[1;0] is .u.i, details[2] is .u.icounts (or null) logdate:0Nd; - if[tptype=`standard; + if[tptype in `standard`chained; d:(`subtables`tplogdate!(details[`schemalist][;0];(first "D" $ -10 sublist string last first details[`logfilelist])^logdate)); :d,{(where 101 = type each x)_x}(`i`icounts`d)!(details[`logfilelist][0;0];details[`rowcounts];details[`date])]; if[tptype~`segmented; diff --git a/code/processes/chainedtp.q b/code/processes/chainedtp.q index 1080b2e79..5c68e2f2b 100644 --- a/code/processes/chainedtp.q +++ b/code/processes/chainedtp.q @@ -1,5 +1,18 @@ / Chained tickerplant +/- subscribers use this to determine what type of process they are talking to +tptype:`chained; + +/- functions used by subscribers +tablelist:{.stpps.t} + +/- subscribers who want to replay need this info +subdetails:{[tabs;instruments] + r:.ctp.sub[tabs;instruments]; + /- reshape dict, make sure logfilelist is an empty list if no log file + `schemalist`logfilelist`rowcounts`date!r@/:(`schema;$[r[`logfile]~();();enlist`i`logfile];`icounts;`d) + } + \d .ctp /- user defined variables @@ -152,7 +165,7 @@ upd:$[createlogfile; sub:{[subtabs;subsyms] r:(`schema`icounts`i`logfile`d)!(); /- get schema & subscribe - r[`schema]:.u.sub[subtabs;subsyms]; + r[`schema]:$[-11h=type subtabs;first;::] .u.sub\:[subtabs,();subsyms]; /- add icounts if subscribing to all syms if[subscribesyms~`;r[`icounts]:.u.icounts]; /- if logfile, add logfile & i From cf3671bc059a2e83fe0ad8fc479772d3e155e082 Mon Sep 17 00:00:00 2001 From: Peter Murphy Date: Fri, 3 Jan 2025 17:03:58 +0000 Subject: [PATCH 2/2] Add CTP subscription tests --- tests/chainedtp/database.q | 2 ++ tests/chainedtp/process.csv | 6 +++++ tests/chainedtp/run.sh | 23 +++++++++++++++++ tests/chainedtp/settings.q | 15 ++++++++++++ tests/chainedtp/test.csv | 49 +++++++++++++++++++++++++++++++++++++ 5 files changed, 95 insertions(+) create mode 100644 tests/chainedtp/database.q create mode 100644 tests/chainedtp/process.csv create mode 100755 tests/chainedtp/run.sh create mode 100644 tests/chainedtp/settings.q create mode 100644 tests/chainedtp/test.csv diff --git a/tests/chainedtp/database.q b/tests/chainedtp/database.q new file mode 100644 index 000000000..4dacc8e97 --- /dev/null +++ b/tests/chainedtp/database.q @@ -0,0 +1,2 @@ +trade:flip `time`sym`price`size`stop`cond`ex`side!"PSFIBCCS" $\: (); +quote:flip `time`sym`bid`ask`bsize`asize`mode`ex`src!"PSFFJJCCS" $\: (); diff --git a/tests/chainedtp/process.csv b/tests/chainedtp/process.csv new file mode 100644 index 000000000..0c0619e53 --- /dev/null +++ b/tests/chainedtp/process.csv @@ -0,0 +1,6 @@ +host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd +localhost,{KDBBASEPORT}+100,discovery,discovery1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/discovery.q,1,,q +localhost,{KDBBASEPORT}+101,tickerplant,tp1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/tickerplant.q,1,-schemafile ${KDBTESTS}/chainedtp/database -tplogdir ${KDBTESTS}/chainedtp/tplogs,q +localhost,{KDBBASEPORT}+102,chainedtp,ctp1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/chainedtp.q,1,-.ctp.tickerplantname tp1 -.ctp.createlogfile 1 -.ctp.logdir :${KDBTESTS}/chainedtp/tplogs,q +localhost,{KDBBASEPORT}+103,chainedtp,ctp2,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/chainedtp.q,1,-.ctp.tickerplantname tp1 -.ctp.createlogfile 0,q +localhost,{KDBBASEPORT}+104,rdb,rdb1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,180,,${KDBCODE}/processes/rdb.q,1,-.rdb.tickerplanttypes chainedtp -.rdb.replaylog 1 -.rdb.gatewaytypes none,q diff --git a/tests/chainedtp/run.sh b/tests/chainedtp/run.sh new file mode 100755 index 000000000..145dfb900 --- /dev/null +++ b/tests/chainedtp/run.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# Handle command-line arguments +source $KDBTESTS/flagparse.sh + +# Path to test directory +testpath=${KDBTESTS}/chainedtp + +# Start procs +${TORQHOME}/torq.sh start discovery1 tp1 ctp1 rdb1 -csv ${testpath}/process.csv + +# Start test proc +/usr/bin/rlwrap q ${TORQHOME}/torq.q \ + -proctype test -procname test1 \ + -test ${testpath} \ + -load ${KDBTESTS}/helperfunctions.q ${testpath}/settings.q \ + -testresults ${testpath}/results/ \ + -runtime $run \ + -procfile ${testpath}/process.csv \ + $debug $stop $write $quiet + +# Stop procs +${TORQHOME}/torq.sh stop rdb1 ctp2 tp1 discovery1 -csv ${testpath}/process.csv diff --git a/tests/chainedtp/settings.q b/tests/chainedtp/settings.q new file mode 100644 index 000000000..cb0c8f35a --- /dev/null +++ b/tests/chainedtp/settings.q @@ -0,0 +1,15 @@ +// IPC connection parameters +.servers.CONNECTIONS:`tickerplant`chainedtp`rdb; +.servers.USERPASS:`admin:admin; + +// Test updates +testtrade:((5#`GOOG),5?`4;10?100.0;10?100i;10#0b;10?.Q.A;10?.Q.A;10#`buy); +testquote:(10?`4;(5?50.0),50+5?50.0;10?100.0;10?100i;10?100i;10?.Q.A;10?.Q.A;10#`3); + +// Paths to process CSV and test TP log directory +processcsv:getenv[`KDBTESTS],"/chainedtp/process.csv"; +tptestlogs:getenv[`KDBTESTS],"/chainedtp/tplogs"; + +// Function projections (using functions from helperfunctions.q) +startproc:startorstopproc["start";;processcsv]; +stopproc:startorstopproc["stop";;processcsv]; diff --git a/tests/chainedtp/test.csv b/tests/chainedtp/test.csv new file mode 100644 index 000000000..2a2fc77ec --- /dev/null +++ b/tests/chainedtp/test.csv @@ -0,0 +1,49 @@ +action,ms,bytes,lang,code,repeat,minver,comment +beforeany,0,0,q,system "sleep 5",1,,"Wait for processes to start" +before,0,0,q,.servers.startup[],1,,"Start connection management" +before,0,0,q,system "sleep 2",1,,"Wait for connections" +before,0,0,q,tpHandle:gethandle[`tp1],1,,"Open handle to TP" +before,0,0,q,ctpHandle:gethandle[`ctp1],1,,"Open handle to CTP" +before,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB" + +comment,,,,,,,"Test subscriptions are as expected" +true,0,0,q,"`tp1`tp1~ctpHandle""exec procname from .sub.SUBSCRIPTIONS where active, table in `trade`quote""",1,,"Check CTP is subscribed to tables from TP" +true,0,0,q,"`ctp1`ctp1~rdbHandle""exec procname from .sub.SUBSCRIPTIONS where active, table in `trade`quote""",1,,"Check RDB is subscribed to tables from CTP" + +comment,,,,,,,"Test updates flow from TP to CTP to RDB" +run,0,0,q,ct:count first testtrade,1,,"Get count of trade updates" +run,0,0,q,cq:count first testquote,1,,"Get count of quote updates" +true,0,0,q,0~rdbHandle"count trade",1,,"Check RDB trade table is empty" +true,0,0,q,0~rdbHandle"count quote",1,,"Check RDB quote table is empty" +run,0,0,q,"tpHandle @/: `.u.upd ,/: ((`trade;testtrade);(`quote;testquote))",1,,"Publish table updates to TP" +run,0,0,q,system "sleep 1",1,,"Wait for updates to publish" +true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade updates were correctly published" +true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote updates were correctly published" + +comment,,,,,,,"Test RDB recovers from CTP log file" +run,0,0,q,kill9proc "rdb1",1,,"Stop RDB" +run,0,0,q,system "sleep 2",1,,"Wait for RDB to stop" +true,0,0,q,not isalive "rdb1",1,,"Check RDB is stopped" +run,0,0,q,startproc "rdb1",1,,"Start RDB" +run,0,0,q,system "sleep 2",1,,"Wait for RDB to start" +run,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB" +true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade recovery was successful" +true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote recovery was successful" + +comment,,,,,,,"Test RDB subscription to CTP without a log file" +run,0,0,q,kill9proc each ("rdb1";"ctp1"),1,,"Stop RDB and CTP" +run,0,0,q,system "sleep 2",1,,"Wait for RDB and CTP to stop" +true,0,0,q,all not isalive each ("rdb1";"ctp1"),1,,"Check RDB and CTP are stopped" +run,0,0,q,startproc "ctp2",1,,"Start CTP without log file" +run,0,0,q,system "sleep 2",1,,"Wait for CTP to start" +run,0,0,q,startproc "rdb1",1,,"Start RDB" +run,0,0,q,system "sleep 2",1,,"Wait for RDB to start" +run,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB" +true,0,0,q,0~rdbHandle"count trade",1,,"Check RDB trade table is empty" +true,0,0,q,0~rdbHandle"count quote",1,,"Check RDB quote table is empty" +run,0,0,q,"tpHandle @/: `.u.upd ,/: ((`trade;testtrade);(`quote;testquote))",1,,"Publish table updates to TP" +run,0,0,q,system "sleep 1",1,,"Wait for updates to publish" +true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade updates were correctly published" +true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote updates were correctly published" + +after,0,0,q,"system ""rm -rf "",tptestlogs",1,,"Delete logs folder" \ No newline at end of file