Skip to content
Merged
7 changes: 7 additions & 0 deletions .changeset/dnf-active-conditions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/electric-db-collection': minor
---

feat: add DNF/active_conditions support for arbitrary boolean WHERE clauses

Support the new Electric server wire protocol (electric-sql/electric#3791). Tags now use `/` delimiter with empty segments for non-participating positions. Shapes with subquery dependencies send `active_conditions` headers and use DNF evaluation for row visibility. Simple shapes without subqueries retain existing empty-tag-set deletion behavior.
2 changes: 1 addition & 1 deletion packages/electric-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"src"
],
"dependencies": {
"@electric-sql/client": "^1.5.13",
"@electric-sql/client": "^1.5.14",
"@standard-schema/spec": "^1.1.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.9.2",
Expand Down
118 changes: 107 additions & 11 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ import {
import { compileSQL } from './sql-compiler'
import {
addTagToIndex,
deriveDisjunctPositions,
findRowsMatchingPattern,
getTagLength,
isMoveInMessage,
isMoveOutMessage,
parseTag as parseTagString,
removeTagFromIndex,
rowVisible,
tagMatchesPattern,
} from './tag-index'
import type { ColumnEncoder } from './sql-compiler'
import type {
MoveOutPattern,
ActiveConditions,
DisjunctPositions,
MovePattern,
MoveTag,
ParsedMoveTag,
RowId,
Expand Down Expand Up @@ -981,16 +987,16 @@ function createElectricSync<T extends Row<unknown>>(

const tagCache = new Map<MoveTag, ParsedMoveTag>()

// Parses a tag string into a MoveTag.
// Parses a tag string into a ParsedMoveTag.
// It memoizes the result parsed tag such that future calls
// for the same tag string return the same MoveTag array.
// for the same tag string return the same ParsedMoveTag array.
const parseTag = (tag: MoveTag): ParsedMoveTag => {
const cachedTag = tagCache.get(tag)
if (cachedTag) {
return cachedTag
}

const parsedTag = tag.split(`|`)
const parsedTag = parseTagString(tag)
tagCache.set(tag, parsedTag)
return parsedTag
}
Expand All @@ -1000,6 +1006,11 @@ function createElectricSync<T extends Row<unknown>>(
const tagIndex: TagIndex = []
let tagLength: number | undefined = undefined

// DNF state: active_conditions are per-row, disjunct_positions are global
// (fixed by the shape's WHERE clause, derived once from the first tagged message).
const rowActiveConditions = new Map<RowId, ActiveConditions>()
let disjunctPositions: DisjunctPositions | undefined = undefined

/**
* Initialize the tag index with the correct length
*/
Expand Down Expand Up @@ -1074,6 +1085,7 @@ function createElectricSync<T extends Row<unknown>>(
tags: Array<MoveTag> | undefined,
removedTags: Array<MoveTag> | undefined,
rowId: RowId,
activeConditions?: ActiveConditions,
): Set<MoveTag> => {
// Initialize tag set for this row if it doesn't exist (needed for checking deletion)
if (!rowTagSets.has(rowId)) {
Expand All @@ -1084,13 +1096,24 @@ function createElectricSync<T extends Row<unknown>>(
// Add new tags
if (tags) {
addTagsToRow(tags, rowId, rowTagSet)

// Derive disjunct positions once — they are fixed by the shape's WHERE clause.
if (disjunctPositions === undefined) {
const parsedTags = tags.map(parseTag)
disjunctPositions = deriveDisjunctPositions(parsedTags)
}
}

// Remove tags
if (removedTags) {
removeTagsFromRow(removedTags, rowId, rowTagSet)
}

// Store active conditions if provided (overwrite on re-send)
if (activeConditions && activeConditions.length > 0) {
rowActiveConditions.set(rowId, [...activeConditions])
}

return rowTagSet
}

Expand All @@ -1101,6 +1124,8 @@ function createElectricSync<T extends Row<unknown>>(
rowTagSets.clear()
tagIndex.length = 0
tagLength = undefined
rowActiveConditions.clear()
disjunctPositions = undefined
}

/**
Expand Down Expand Up @@ -1129,22 +1154,45 @@ function createElectricSync<T extends Row<unknown>>(

// Remove the row from the tag sets map
rowTagSets.delete(rowId)
rowActiveConditions.delete(rowId)
}

/**
* Remove matching tags from a row based on a pattern
* Returns true if the row's tag set is now empty
* Returns true if the row should be deleted (no longer visible)
*/
const removeMatchingTagsFromRow = (
rowId: RowId,
pattern: MoveOutPattern,
pattern: MovePattern,
): boolean => {
const rowTagSet = rowTagSets.get(rowId)
if (!rowTagSet) {
return false
}

// Find tags that match this pattern and remove them
// DNF mode: check visibility using active conditions.
// Tag index entries are preserved so that move-in can re-activate positions.
const activeConditions = rowActiveConditions.get(rowId)
if (activeConditions && disjunctPositions) {
// Set the condition at this pattern's position to false
activeConditions[pattern.pos] = false

if (!rowVisible(activeConditions, disjunctPositions)) {
// Row is no longer visible — clean up all state including tag index
for (const tag of rowTagSet) {
const parsedTag = parseTag(tag)
removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!)
tagCache.delete(tag)
}
rowTagSets.delete(rowId)
rowActiveConditions.delete(rowId)
return true
}
return false
}

// Simple shape (no subquery dependencies — server sends no active_conditions):
// Remove matching tags and delete if tag set is empty
for (const tag of rowTagSet) {
const parsedTag = parseTag(tag)
if (tagMatchesPattern(parsedTag, pattern)) {
Expand All @@ -1153,7 +1201,6 @@ function createElectricSync<T extends Row<unknown>>(
}
}

// Check if row's tag set is now empty
if (rowTagSet.size === 0) {
rowTagSets.delete(rowId)
return true
Expand All @@ -1166,7 +1213,7 @@ function createElectricSync<T extends Row<unknown>>(
* Process move-out event: remove matching tags from rows and delete rows with empty tag sets
*/
const processMoveOutEvent = (
patterns: Array<MoveOutPattern>,
patterns: Array<MovePattern>,
begin: () => void,
write: (message: ChangeMessageOrDeleteKeyMessage<T>) => void,
transactionStarted: boolean,
Expand Down Expand Up @@ -1204,6 +1251,30 @@ function createElectricSync<T extends Row<unknown>>(
return txStarted
}

/**
* Process move-in event: re-activate conditions for rows matching the patterns.
* This is a silent operation — no messages are emitted to the collection.
*/
const processMoveInEvent = (patterns: Array<MovePattern>): void => {
if (tagLength === undefined) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Received move-in message but no tag length set yet, ignoring`,
)
return
}

for (const pattern of patterns) {
const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex)

for (const rowId of affectedRowIds) {
const activeConditions = rowActiveConditions.get(rowId)
if (activeConditions) {
activeConditions[pattern.pos] = true
}
}
}
}

/**
* Get the sync metadata for insert operations
* @returns Record containing relation information
Expand Down Expand Up @@ -1433,6 +1504,11 @@ function createElectricSync<T extends Row<unknown>>(
const removedTags = changeMessage.headers.removed_tags
const hasTags = tags || removedTags

// Extract active_conditions from headers (DNF support)
const activeConditions = changeMessage.headers.active_conditions as
| ActiveConditions
| undefined

const rowId = collection.getKeyFromItem(changeMessage.value)
const operation = changeMessage.headers.operation

Expand All @@ -1453,7 +1529,12 @@ function createElectricSync<T extends Row<unknown>>(
if (isDelete) {
clearTagsForRow(rowId)
} else if (hasTags) {
processTagsForChangeMessage(tags, removedTags, rowId)
processTagsForChangeMessage(
tags,
removedTags,
rowId,
activeConditions,
)
}

write({
Expand Down Expand Up @@ -1496,7 +1577,11 @@ function createElectricSync<T extends Row<unknown>>(

for (const message of messages) {
// Add message to current batch buffer (for race condition handling)
if (isChangeMessage(message) || isMoveOutMessage(message)) {
if (
isChangeMessage(message) ||
isMoveOutMessage(message) ||
isMoveInMessage(message)
) {
currentBatchMessages.setState((currentBuffer) => {
const newBuffer = [...currentBuffer, message]
// Limit buffer size for safety
Expand Down Expand Up @@ -1593,6 +1678,14 @@ function createElectricSync<T extends Row<unknown>>(
transactionStarted,
)
}
} else if (isMoveInMessage(message)) {
// Handle move-in event: re-activate conditions for matching rows.
// Buffer if buffering, otherwise process immediately.
if (isBufferingInitialSync() && !transactionStarted) {
bufferedMessages.push(message)
} else {
processMoveInEvent(message.headers.patterns)
}
} else if (isMustRefetchMessage(message)) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`,
Expand Down Expand Up @@ -1672,6 +1765,9 @@ function createElectricSync<T extends Row<unknown>>(
write,
transactionStarted,
)
} else if (isMoveInMessage(bufferedMsg)) {
// Process buffered move-in messages during atomic swap
processMoveInEvent(bufferedMsg.headers.patterns)
}
}

Expand Down
Loading
Loading