Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions code/common/subscriptions.q
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ subscribe:{[tabs;instrs;setschema;replaylog;proc]
$[tptype=`standard;
[tablesfunc:{key `.u.w};
subfunc:{`schemalist`logfilelist`rowcounts`date!(.u.sub\:[x;y];enlist(.u`i`L);(.u `icounts);(.u `d))}];
tptype=`segmented;
tptype in `chained`segmented;
[tablesfunc:`tablelist;
subfunc:`subdetails];
[.lg.e[`subscribe;e:"unrecognised tickerplant type: ",string tptype]; 'e]];
Expand All @@ -125,7 +125,7 @@ subscribe:{[tabs;instrs;setschema;replaylog;proc]
// the date from the name of the tickerplant log file (assuming the tp log has a name like `: sym2014.01.01
// plus .u.i and .u.icounts if existing on TP - details[1;0] is .u.i, details[2] is .u.icounts (or null)
logdate:0Nd;
if[tptype=`standard;
if[tptype in `standard`chained;
d:(`subtables`tplogdate!(details[`schemalist][;0];(first "D" $ -10 sublist string last first details[`logfilelist])^logdate));
:d,{(where 101 = type each x)_x}(`i`icounts`d)!(details[`logfilelist][0;0];details[`rowcounts];details[`date])];
if[tptype~`segmented;
Expand Down
15 changes: 14 additions & 1 deletion code/processes/chainedtp.q
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
/ Chained tickerplant

/- subscribers use this to determine what type of process they are talking to
tptype:`chained;

/- functions used by subscribers
tablelist:{.stpps.t}

/- subscribers who want to replay need this info
subdetails:{[tabs;instruments]
r:.ctp.sub[tabs;instruments];
/- reshape dict, make sure logfilelist is an empty list if no log file
`schemalist`logfilelist`rowcounts`date!r@/:(`schema;$[r[`logfile]~();();enlist`i`logfile];`icounts;`d)
}

\d .ctp

/- user defined variables
Expand Down Expand Up @@ -152,7 +165,7 @@ upd:$[createlogfile;
sub:{[subtabs;subsyms]
r:(`schema`icounts`i`logfile`d)!();
/- get schema & subscribe
r[`schema]:.u.sub[subtabs;subsyms];
r[`schema]:$[-11h=type subtabs;first;::] .u.sub\:[subtabs,();subsyms];
/- add icounts if subscribing to all syms
if[subscribesyms~`;r[`icounts]:.u.icounts];
/- if logfile, add logfile & i
Expand Down
2 changes: 2 additions & 0 deletions tests/chainedtp/database.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
trade:flip `time`sym`price`size`stop`cond`ex`side!"PSFIBCCS" $\: ();
quote:flip `time`sym`bid`ask`bsize`asize`mode`ex`src!"PSFFJJCCS" $\: ();
6 changes: 6 additions & 0 deletions tests/chainedtp/process.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd
localhost,{KDBBASEPORT}+100,discovery,discovery1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/discovery.q,1,,q
localhost,{KDBBASEPORT}+101,tickerplant,tp1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/tickerplant.q,1,-schemafile ${KDBTESTS}/chainedtp/database -tplogdir ${KDBTESTS}/chainedtp/tplogs,q
localhost,{KDBBASEPORT}+102,chainedtp,ctp1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/chainedtp.q,1,-.ctp.tickerplantname tp1 -.ctp.createlogfile 1 -.ctp.logdir :${KDBTESTS}/chainedtp/tplogs,q
localhost,{KDBBASEPORT}+103,chainedtp,ctp2,${TORQHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/chainedtp.q,1,-.ctp.tickerplantname tp1 -.ctp.createlogfile 0,q
localhost,{KDBBASEPORT}+104,rdb,rdb1,${TORQHOME}/appconfig/passwords/accesslist.txt,1,1,180,,${KDBCODE}/processes/rdb.q,1,-.rdb.tickerplanttypes chainedtp -.rdb.replaylog 1 -.rdb.gatewaytypes none,q
23 changes: 23 additions & 0 deletions tests/chainedtp/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

# Handle command-line arguments
source $KDBTESTS/flagparse.sh

# Path to test directory
testpath=${KDBTESTS}/chainedtp

# Start procs
${TORQHOME}/torq.sh start discovery1 tp1 ctp1 rdb1 -csv ${testpath}/process.csv

# Start test proc
/usr/bin/rlwrap q ${TORQHOME}/torq.q \
-proctype test -procname test1 \
-test ${testpath} \
-load ${KDBTESTS}/helperfunctions.q ${testpath}/settings.q \
-testresults ${testpath}/results/ \
-runtime $run \
-procfile ${testpath}/process.csv \
$debug $stop $write $quiet

# Stop procs
${TORQHOME}/torq.sh stop rdb1 ctp2 tp1 discovery1 -csv ${testpath}/process.csv
15 changes: 15 additions & 0 deletions tests/chainedtp/settings.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// IPC connection parameters
.servers.CONNECTIONS:`tickerplant`chainedtp`rdb;
.servers.USERPASS:`admin:admin;

// Test updates
testtrade:((5#`GOOG),5?`4;10?100.0;10?100i;10#0b;10?.Q.A;10?.Q.A;10#`buy);
testquote:(10?`4;(5?50.0),50+5?50.0;10?100.0;10?100i;10?100i;10?.Q.A;10?.Q.A;10#`3);

// Paths to process CSV and test TP log directory
processcsv:getenv[`KDBTESTS],"/chainedtp/process.csv";
tptestlogs:getenv[`KDBTESTS],"/chainedtp/tplogs";

// Function projections (using functions from helperfunctions.q)
startproc:startorstopproc["start";;processcsv];
stopproc:startorstopproc["stop";;processcsv];
49 changes: 49 additions & 0 deletions tests/chainedtp/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
action,ms,bytes,lang,code,repeat,minver,comment
beforeany,0,0,q,system "sleep 5",1,,"Wait for processes to start"
before,0,0,q,.servers.startup[],1,,"Start connection management"
before,0,0,q,system "sleep 2",1,,"Wait for connections"
before,0,0,q,tpHandle:gethandle[`tp1],1,,"Open handle to TP"
before,0,0,q,ctpHandle:gethandle[`ctp1],1,,"Open handle to CTP"
before,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB"

comment,,,,,,,"Test subscriptions are as expected"
true,0,0,q,"`tp1`tp1~ctpHandle""exec procname from .sub.SUBSCRIPTIONS where active, table in `trade`quote""",1,,"Check CTP is subscribed to tables from TP"
true,0,0,q,"`ctp1`ctp1~rdbHandle""exec procname from .sub.SUBSCRIPTIONS where active, table in `trade`quote""",1,,"Check RDB is subscribed to tables from CTP"

comment,,,,,,,"Test updates flow from TP to CTP to RDB"
run,0,0,q,ct:count first testtrade,1,,"Get count of trade updates"
run,0,0,q,cq:count first testquote,1,,"Get count of quote updates"
true,0,0,q,0~rdbHandle"count trade",1,,"Check RDB trade table is empty"
true,0,0,q,0~rdbHandle"count quote",1,,"Check RDB quote table is empty"
run,0,0,q,"tpHandle @/: `.u.upd ,/: ((`trade;testtrade);(`quote;testquote))",1,,"Publish table updates to TP"
run,0,0,q,system "sleep 1",1,,"Wait for updates to publish"
true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade updates were correctly published"
true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote updates were correctly published"

comment,,,,,,,"Test RDB recovers from CTP log file"
run,0,0,q,kill9proc "rdb1",1,,"Stop RDB"
run,0,0,q,system "sleep 2",1,,"Wait for RDB to stop"
true,0,0,q,not isalive "rdb1",1,,"Check RDB is stopped"
run,0,0,q,startproc "rdb1",1,,"Start RDB"
run,0,0,q,system "sleep 2",1,,"Wait for RDB to start"
run,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB"
true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade recovery was successful"
true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote recovery was successful"

comment,,,,,,,"Test RDB subscription to CTP without a log file"
run,0,0,q,kill9proc each ("rdb1";"ctp1"),1,,"Stop RDB and CTP"
run,0,0,q,system "sleep 2",1,,"Wait for RDB and CTP to stop"
true,0,0,q,all not isalive each ("rdb1";"ctp1"),1,,"Check RDB and CTP are stopped"
run,0,0,q,startproc "ctp2",1,,"Start CTP without log file"
run,0,0,q,system "sleep 2",1,,"Wait for CTP to start"
run,0,0,q,startproc "rdb1",1,,"Start RDB"
run,0,0,q,system "sleep 2",1,,"Wait for RDB to start"
run,0,0,q,rdbHandle:gethandle[`rdb1],1,,"Open handle to RDB"
true,0,0,q,0~rdbHandle"count trade",1,,"Check RDB trade table is empty"
true,0,0,q,0~rdbHandle"count quote",1,,"Check RDB quote table is empty"
run,0,0,q,"tpHandle @/: `.u.upd ,/: ((`trade;testtrade);(`quote;testquote))",1,,"Publish table updates to TP"
run,0,0,q,system "sleep 1",1,,"Wait for updates to publish"
true,0,0,q,ct~rdbHandle"count trade",1,,"Check trade updates were correctly published"
true,0,0,q,cq~rdbHandle"count quote",1,,"Check quote updates were correctly published"

after,0,0,q,"system ""rm -rf "",tptestlogs",1,,"Delete logs folder"