Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ coverage
dist
build
.direnv
tmp
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@
"uuid": ">=8 <10"
},
"devDependencies": {
"@types/debug": "^4.1.12",
"@types/lodash": "^4.17.23",
"@types/node": "^22.10.7",
"@types/prop-types": "^15.7.15",
"@types/uuid": "^9.0.8",
"@typescript-eslint/eslint-plugin": "^7.16.0",
"@typescript-eslint/parser": "^7.16.0",
"@vitest/coverage-v8": "3.0.5",
Expand Down
19 changes: 19 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 63 additions & 35 deletions src/EventQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ const dbg = debug('strato-db/queue')

let warnedLatest

/** @typedef {defaultColumns} Columns */
const defaultColumns = {
const defaultColumns = /** @type {const} */ ({
v: {
type: 'INTEGER',
autoIncrement: true,
Expand All @@ -22,46 +21,71 @@ const defaultColumns = {
data: {type: 'JSON'},
result: {type: 'JSON'},
size: {type: 'INTEGER', default: 0, get: false},
}
})

/**
* An event queue, including history.
*
* @template {T}
* @template {U}
* @implements {EventQueue<T>}
* @template {ESEvent} [RealItem=ESEvent] Default is `ESEvent`
* @template {Partial<EQOptions<RealItem>>} [Config=object] Default is `object`
* @template {string} [IDCol=EventQueueIDColumn<Config>] Default is
* `EventQueueIDColumn<Config>`
* @template {{[x: string]: any}} [Item=EventQueueItem<RealItem, IDCol>]
* Default is `EventQueueItem<RealItem, IDCol>`
* @template {JMColumns<Item, IDCol>} [Columns=EventQueueColumns<Config, Item>]
* Default is `EventQueueColumns<Config, Item>`
* @extends {JsonModel<
* RealItem,
* JsonModelConfig & Config,
* 'v',
* Item,
* Config,
* Columns & typeof defaultColumns
* >}
* @implements {EventQueue<RealItem, Config, 'v', Item, Columns>}
*/
class EventQueueImpl extends JsonModel {
/** @param {EQOptions<T, U>} */
/** @param {EQOptions<RealItem>} args */
constructor({name = 'history', forever, withViews, ...rest}) {
const columns = {...defaultColumns}
if (rest.columns)
for (const [key, value] of Object.entries(rest.columns)) {
if (!value) continue
if (columns[key]) throw new TypeError(`Cannot override column ${key}`)
columns[key] = value
}
const tmpCols = rest.columns
? /** @type {Columns} */ (
Object.fromEntries(
Object.entries(rest.columns).filter(([key, value]) => {
if (!value) return false
if (defaultColumns[key])
throw new TypeError(`Cannot override column ${key}`)
return true
})
)
)
: undefined

const columns = /** @type {Columns & typeof defaultColumns} */ ({
...tmpCols,
...defaultColumns,
})

super({
...rest,
name,
idCol: 'v',
idCol: /** @type {const} */ ('v'),
columns,
migrations: {
...rest.migrations,
addTypeSizeIndex: ({db}) =>
db.exec(
`CREATE INDEX IF NOT EXISTS "history type,size" on history(type, size)`
),
'20190521_addViews': withViews
? async ({db}) => {
const historySchema = await db.all('PRAGMA table_info("history")')
// This adds a field with data size, kept up-to-date with triggers
if (!historySchema.some(f => f.name === 'size'))
await db.exec(
`ALTER TABLE history ADD COLUMN size INTEGER DEFAULT 0`
)
// The size WHERE clause is to prevent recursive triggers
await db.exec(`
...(withViews && {
'20190521_addViews': async ({db}) => {
const historySchema = await db.all('PRAGMA table_info("history")')
// This adds a field with data size, kept up-to-date with triggers
if (!historySchema.some(f => f.name === 'size'))
await db.exec(
`ALTER TABLE history ADD COLUMN size INTEGER DEFAULT 0`
)
// The size WHERE clause is to prevent recursive triggers
await db.exec(`
DROP TRIGGER IF EXISTS "history size insert";
DROP TRIGGER IF EXISTS "history size update";
CREATE TRIGGER "history size insert" AFTER INSERT ON history BEGIN
Expand All @@ -87,12 +111,13 @@ class EventQueueImpl extends JsonModel {
SUM(size)/1024/1024 AS MB
FROM history GROUP BY type ORDER BY count DESC;
`)
// Recalculate size
await db.exec(`UPDATE history SET size=0`)
}
: null,
// Recalculate size
await db.exec(`UPDATE history SET size=0`)
},
}),
},
})

this.currentV = -1
this.knownV = 0
this.forever = !!forever
Expand All @@ -101,8 +126,8 @@ class EventQueueImpl extends JsonModel {
/**
* Replace existing event data.
*
* @param {Event} event - The new event.
* @returns {Promise<void>} - Promise for set completion.
* @param {Partial<Item>} event - The new event.
* @returns {Promise} - Promise for set completion.
*/
set(event) {
if (!event.v) {
Expand Down Expand Up @@ -148,16 +173,18 @@ class EventQueueImpl extends JsonModel {
return this.currentV
}

/** @type {Promise<EventQueueAddResult<keyof EventTypes>> | null} */
_addP = null

/**
* Atomically add an event to the queue.
*
* @param {string} type - Event type.
* @param {any} [data] - Event data.
* @template {keyof EventTypes} T
* @param {T} type - Event type.
* @param {EventTypes[T]} [data] - Event data.
* @param {number} [ts=Date.now()] - Event timestamp, ms since epoch. Default
* is `Date.now()`
* @returns {Promise<Event>} - Promise for the added event.
* @returns {Promise<EventQueueAddResult<T>>} - Promise for the added event.
*/
add(type, data, ts) {
if (!type || typeof type !== 'string')
Expand Down Expand Up @@ -192,6 +219,7 @@ class EventQueueImpl extends JsonModel {
return this._addP
}

/** @type {Promise | null} */
_nextAddedP = null

_nextAddedResolve = event => {
Expand Down Expand Up @@ -222,7 +250,7 @@ class EventQueueImpl extends JsonModel {
*
* @param {number} [v=0] The version. Default is `0`
* @param {boolean} [noWait] Do not wait for the next event.
* @returns {Promise<Event>} The event if found.
* @returns {Promise<Item | undefined>} The event if found.
*/
async getNext(v = 0, noWait = false) {
let event
Expand Down
Loading