Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions code/processes/idb.q
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ maptoint:{[val]
/- if using a symbol column, enumerate against the hdb sym file
sym?`TORQNULLSYMBOL^val]
};

/- helper function to support queries against the sym column in partbyfirstchar
mapfctoint:{[val]
.Q.an?$[0<type val;first each;first] string val
};
63 changes: 46 additions & 17 deletions code/processes/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ writedownmode:@[value;`writedownmode;`default]; /-the
/- at EOD the data will be merged from each partition before being moved to hdb
/- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv
/- at EOD the data will be merged from each partition before being moved to hdb
/- 3. partbyfirstchar - the data is partitioned by [ partitiontype ] and a symbol column based on the first character with parted attribution assigned in sort.csv
/- at EOD the data will be sorted and merged from each partition before being moved to hdb


mergemode:@[value;`mergemode;`part]; /-the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways:
/- 1. part - the entire partition is merged to the hdb
Expand All @@ -52,7 +55,6 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num

sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort
sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process

wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables

subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for
Expand Down Expand Up @@ -92,7 +94,7 @@ saveenabled: any `save`saveandsort in mode;
sortenabled: any `sort`saveandsort in mode;

/- parted writedown modes have special behaviour during merging or WDB initialisation
partwritemodes:`partbyattr`partbyenum;
partwritemodes:`partbyattr`partbyenum`partbyfirstchar;

/ - log which modes are enabled
switch: string `off`on;
Expand Down Expand Up @@ -125,19 +127,24 @@ maptoint:{[val]
`long$ (` sv hdbsettings[`hdbdir],`sym)?`TORQNULLSYMBOL^val]
};

mapfctoint:{[val]
.Q.an?$[0<type val;first each;first] string val
};

/- function to upsert to specified directory
upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt;writedownmode]
/- enumerate first extra partition value
if[writedownmode~`partbyenum;i:maptoint first expt];
if[writedownmode~`partbyfirstchar;i:mapfctoint first expt];
/- create directory location for selected partition
/- replace non-alphanumeric characters in symbols with _
/- convert to symbols and replace any null values with `TORQNULLSYMBOL
directory:$[writedownmode~`partbyenum;
directory:$[writedownmode in `partbyenum`partbyfirstchar;
` sv .Q.par[dir;pt;`$string i],tablename,`;
` sv .Q.par[dir;pt;tablename],(`$"_"^.Q.an .Q.an?"_" sv string `TORQNULLSYMBOL^ ensuresymlist[expt]),`];
` sv .Q.par[dir;pt;tablename],(`$"_"^.Q.an .Q.an?"_" sv string `TORQNULLSYMBOL^ ensuresymlist[expt]),`];
.lg.o[`save;"saving ",(string tablename)," data to partition ",string directory];
/- selecting rows of table with matching partition
r:?[tabdata;$[writedownmode~`partbyenum;enlist(in;first expttype;expt);{(x;y;(),z)}[in;;]'[expttype;expt]];0b;()];
r:?[tabdata;$[writedownmode in `partbyenum`partbyfirstchar;enlist(in;first expttype;expt);{(x;y;(),z)}[in;;]'[expttype;expt]];0b;()];
/- upsert selected data matched on partition to specific directory
.[upsert;(directory;r);{[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e}];
.lg.o[`track;"appending details to partsizes"];
Expand All @@ -152,8 +159,8 @@ savetablesbypart:{[dir;pt;forcesave;tablename;writedownmode]
.lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"];
/- get additional partition(s) defined by parted attribute in sort.csv
extrapartitiontype:.merge.getextrapartitiontype[tablename];
if[(writedownmode~`partbyenum) and 1<c:count extrapartitiontype;
.lg.e[`partbyenum;"only 1 parted attribute should be defined on table when partbyenum writedown mode is used but we have ",string c]
if[(writedownmode in `partbyenum`partbyfirstchar) and 1<c:count extrapartitiontype;
.lg.e[writedownmode;"only 1 parted attribute should be defined on table when using partbyenum and partbyfirstchar writedown modes, but we have ",string c]
];
/- check each partition type actually is a column in the selected table
.merge.checkpartitiontype[tablename;extrapartitiontype];
Expand Down Expand Up @@ -182,7 +189,7 @@ savetables:$[writedownmode in partwritemodes;savetablesbypart[;;;;writedownmode]
savetodisk:{[]
changes:savetables[savedir;getpartition[];immediate;] each tablelist[];
/- we have to let the idbs know of the changes in the wdbhdb. using filldb[] to make sure it is a db with all the tables
if[any[changes] and writedownmode in `partbyenum`default;filldb getpartition[];notifyidbs[`.idb.intradayreload;enlist()]]};
if[any[changes] and writedownmode in `partbyenum`partbyfirstchar`default;filldb getpartition[];notifyidbs[`.idb.intradayreload;enlist()]]};

/- send an intraday reload message to idbs:
notifyidbs:{[func;params]
Expand Down Expand Up @@ -332,8 +339,8 @@ merge:{[dir;pt;tableinfo;mergelimits;hdbsettings;mergemethod;writedownmode]
setcompression[hdbsettings[`compression]];
/- get tablename
tabname:tableinfo[0];
/- get list of partition directories for specified table - partbyenum uses different folder structure vs partbyattr/default
partdirs:$[writedownmode in `partbyenum;
/- get list of partition directories for specified table - partbyenum & partbyfirstchar use different folder structures vs partbyattr/default
partdirs:$[writedownmode in `partbyenum`partbyfirstchar;
p where 0<count each key each p:` sv' ((-1_` vs p),/:key p:.Q.par[hsym dir;pt;`]),\: tabname;
` sv' tabledir,/:key tabledir:.Q.par[hsym dir;pt;tabname]];
/- we only really have to merge those partitions where we have received some updates, otherwise table is empty
Expand Down Expand Up @@ -363,7 +370,7 @@ merge:{[dir;pt;tableinfo;mergelimits;hdbsettings;mergemethod;writedownmode]
.merge.mergehybrid[tableinfo;dest;partdirs;mergelimits[tabname]]
];
.lg.o[`merge;"removing segments ", (", " sv string[partdirs])];
$[writedownmode in `partbyenum;
$[writedownmode in `partbyenum`partbyfirstchar;
removetablefromenumdir each partdirs;
.os.deldir .os.pth[[string[tabledir]]]
];
Expand All @@ -384,7 +391,7 @@ removetablefromenumdir:{[partdir]

endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]
/- merge data from partitions
/- .z.pd funciton in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
/- .z.pd function in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
tempfix2:$[.finspace.enabled;0b;(0 < count .z.pd[])];
$[tempfix2 and ((system "s")<0);
[.lg.o[`merge;"merging on worker"];
Expand Down Expand Up @@ -423,12 +430,34 @@ endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode
];
};

/- end of day sort and merge only used by writedown mode sortbyfirstchar, requiring sort pre-merge
endofdaysortandmerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]
/-sort permitted tables in database
/- sort the table and garbage collect (if enabled)
.lg.o[`sort;"starting to sort data"];
/- .z.pd function in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
tempfix1:$[.finspace.enabled;0b;count[.z.pd[]]];
tnds:raze{y,/:.Q.dd[x;]each key[x],\:y}[.Q.dd[dir;pt]]each key tablist;
tnds:tnds where tnds[;1] in exec ptdir from .merge.partsizes;
$[tempfix1&0>system"s";
[.lg.o[`sortandmerge;"sorting on worker sort", string .z.p];
{(neg x)(`.wdb.reloadsymfile;y);(neg x)(::)}[;.Q.dd[hdbsettings `hdbdir;`sym]] each .z.pd[];
{[x;compression] setcompression compression;.sort.sorttab x;if[gc;.gc.run[]]}[;hdbsettings`compression] peach tnds];
[.lg.o[`sort;"sorting on main sort"];
reloadsymfile[.Q.dd[hdbsettings `hdbdir;`sym]];
{[x] .sort.sorttab[x];if[gc;.gc.run[]]} each tnds]];
.lg.o[`sort;"finished sorting data"];
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
};

/- end of day sort [depends on writedown mode]
endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]
/- set compression level (.z.zd)
setcompression[hdbsettings[`compression]];
$[writedownmode in partwritemodes;
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
$[writedownmode~`partbyfirstchar; /-partbyfirstchar will not be sorted by sym within each parition, this needs done first
endofdaysortandmerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]];
endofdaysortdate[dir;pt;key tablist;hdbsettings]
];
/- run steps to rollover idb
Expand Down Expand Up @@ -534,7 +563,7 @@ fixpartition:{[subto]
];
}

/- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables.
/- for writedown modes partbyenum/partbyfirstchar/default we make sure that partition 0/currentpartition has all the tables.
/- In that case we can use .Q.chk later to fill the db making it useable for intraday processes
/- pt - date; partition for which the function should initialise
initmissingtables:{[pt]
Expand All @@ -550,7 +579,7 @@ filldb:{[pt]

/- initialises table t in db with its schema in part
inittable:{[t;pt]
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
tabledir:` sv $[writedownmode in `partbyenum`partbyfirstchar; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]];
}

Expand Down Expand Up @@ -590,7 +619,7 @@ getsortparams:{[]
/- get the attributes csv file
/-even if running with a sort process should read this file to cope with backups
.sort.getsortcsv[sortcsv];
/- check the sort.csv for parted attributes `p if the writedownmode `partbyattr or `partbyenum is selected
/- check the sort.csv for parted attributes `p if the writedownmode `partbyattr, `partbyenum or `partbyfirstchar is selected
/- if each table does not have at least one `p attribute the process will exit
if[writedownmode in partwritemodes;

Expand All @@ -612,7 +641,7 @@ getsortparams:{[]
/- If the function is ran on sort process send initmissingtables command to wdbs
idbreload:{[pt]
.lg.o[`idb;"starting idb reload"];
if[writedownmode in `partbyenum`default;
if[writedownmode in `partbyenum`default`partbyfirstchar;
.lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]];
$[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]];
.lg.o[`eod;"notifying idbs for newly created partition"];
Expand Down
37 changes: 34 additions & 3 deletions docs/Processes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,24 @@ sorting at the end of the day.
In the above example, the data is parted by sym, and number 456 is
the order of MSFT_N symbol entry in the HDB sym file.

- partbyfirstchar - Data is persisted to a partition scheme where the partition
is derived from the first character in sym colum present in the sort.csv
file. Like partbyenum, this can be only be done by one column which has the
parted attribute applied to it. It must be a symbol column due the nature
of the character extraction. The numerical value for characters will map
to the index of the character in the .Q.an. For those that arent contained i
within .Q.an, they will map to the count of .Q.an. Partitioning in this way
means that the data within each partition is not sorted for the parted
attribute to be applied, which means in the EOD process the data needs sorted
before being merged. This sort happens partition by partition rather than
as a whole. The wdb partition scheme is of the form
\[wdbdir\]/\[partitiontype\]/\[first char index .Q.an\]/\[table(s)\]/
A typical partition directory would be similar to (for ex sym: MSFT_N)
wdb/database/2025.11.04/38/trade
In the above example, the data is parted by sym, and number 38 is the
index position of M in .Q.an.


The advantage of partbyenum over partbyattr could be that the
directory structure it uses represents a HDB that is ready to be loaded
intraday. At the end of the day the data gets upserted to the HDB the
Expand All @@ -1046,7 +1064,10 @@ data sets with a low cardinality (ie. small number of distinct elements)
the optional method may provide a significant time saving, upwards of
50%. The optional method should also reduce the memory usage at the end
of day event, as joining data is generally less memory intensive than
sorting.
sorting. The optional partbyfirstchar method allows a method for subdividing
data with a high cardinality to reduce the number of partitions being
written to, while providing a means for reduced memory footprint on final sort
versus default.

<a name="idb"></a>

Expand All @@ -1056,13 +1077,19 @@ Intraday Database (IDB)
The Intraday Database or IDB is a simple process that allows access to
data written down intraday. This assumes that there is an existing WDB
(and HDB) process creating a DB on disk that can be loaded with a simple
load command. As of now default and partbyenum WDB writedown modes are supported.
The responsibility of an IDB is therefore:
load command. As of now default, partbyenum and partbyfirstchar WDB writedown
modes are supported. The responsibility of an IDB is therefore:

1. Serving queries. Since partbyenum writedown mode is done by enumerated
symbol columns a helper function maptoint is implemented to support
symbol lookup in sym file:
select from trade where int=maptoint[`MSFT_N]
Also with partbyfirstchar being an alternate approach to create a
numerical partition, there is a helper function to locate the correct
value:
select from trade where int=mapfctoint[`MSFT],sym=`MSFT
select from trade where int in mapfctoint[`MSFT`AAPL],sym in `MSFT`AAPL


2. Can be triggered for a reload. This is usually done by the WDB process
periodically.
Expand Down Expand Up @@ -1097,6 +1124,10 @@ The IDB can be queried just like any other HDB. If writedown mode partbyenum is
```
neg[gwHandle](`.gw.asyncexec;"select from trade where int=maptoint[`GOOG]";`idb);gwHandle[]
```
Likewise if partbyfirstchar writedown mode is used there is a "mapfctoint" which can be used
```
neg[gwHandle](`.gw.asyncexec;"select from trade where int in maptoint[`GOOG`MSFT],sym in `GOOG`MSFT";`idb);gwHandle[]
```

### Scalability

Expand Down