Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/sequencer/src/protocol/baselayer/BaseLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import {
} from "@proto-kit/common";

import { IncomingMessageAdapter } from "../../settlement/messages/IncomingMessageAdapter";
import type { OutgoingMessageQueue } from "../../settlement/messages/WithdrawalQueue";
import type { OutgoingMessageAdapter } from "../../settlement/messages/WithdrawalQueue";

export interface BaseLayerDependencyRecord extends DependencyRecord {
IncomingMessageAdapter: DependencyDeclaration<IncomingMessageAdapter>;
// TODO Move that to Database?
OutgoingMessageQueue: DependencyDeclaration<OutgoingMessageQueue>;
OutgoingMessageQueue: DependencyDeclaration<OutgoingMessageAdapter>;
}

export interface BaseLayer extends DependencyFactory {
Expand Down
21 changes: 8 additions & 13 deletions packages/sequencer/src/protocol/baselayer/NoopBaseLayer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { noop } from "@proto-kit/common";
import { PublicKey } from "o1js";
import { Field, PublicKey } from "o1js";
import { Withdrawal } from "@proto-kit/protocol";

import {
Expand All @@ -10,7 +10,7 @@ import { IncomingMessageAdapter } from "../../settlement/messages/IncomingMessag
import { PendingTransaction } from "../../mempool/PendingTransaction";
import {
OutgoingMessage,
OutgoingMessageQueue,
OutgoingMessageAdapter,
} from "../../settlement/messages/WithdrawalQueue";

import { BaseLayer, BaseLayerDependencyRecord } from "./BaseLayer";
Expand All @@ -35,16 +35,11 @@ class NoopIncomingMessageAdapter implements IncomingMessageAdapter {
}
}

class NoopOutgoingMessageQueue implements OutgoingMessageQueue {
length(): number {
return 0;
}

peek(num: number): OutgoingMessage<Withdrawal>[] {
return [];
}

pop(num: number): OutgoingMessage<Withdrawal>[] {
class NoopOutgoingMessageAdapter implements OutgoingMessageAdapter {
async fetchWithdrawals(
tokenId: Field,
offset: number
): Promise<OutgoingMessage<Withdrawal>[]> {
return [];
}
}
Expand All @@ -65,7 +60,7 @@ export class NoopBaseLayer extends SequencerModule implements BaseLayer {
useClass: NoopIncomingMessageAdapter,
},
OutgoingMessageQueue: {
useClass: NoopOutgoingMessageQueue,
useClass: NoopOutgoingMessageAdapter,
},
};
}
Expand Down
29 changes: 22 additions & 7 deletions packages/sequencer/src/settlement/BridgingModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import { AsyncMerkleTreeStore } from "../state/async/AsyncMerkleTreeStore";
import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy";
import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer";

import type { OutgoingMessageQueue } from "./messages/WithdrawalQueue";
import type { OutgoingMessageAdapter } from "./messages/WithdrawalQueue";
import type { SettlementModule } from "./SettlementModule";
import { SettlementUtils } from "./utils/SettlementUtils";
import { MinaTransactionSender } from "./transactions/MinaTransactionSender";
Expand Down Expand Up @@ -73,7 +73,7 @@ export class BridgingModule extends SequencerModule {
@inject("SettlementModule")
private readonly settlementModule: SettlementModule,
@inject("OutgoingMessageQueue")
private readonly outgoingMessageQueue: OutgoingMessageQueue,
private readonly outgoingMessageQueue: OutgoingMessageAdapter,
@inject("AsyncMerkleStore")
private readonly merkleTreeStore: AsyncMerkleTreeStore,
@inject("FeeStrategy")
Expand Down Expand Up @@ -326,7 +326,6 @@ export class BridgingModule extends SequencerModule {
tx: Transaction<false, true>;
}[]
> {
const length = this.outgoingMessageQueue.length();
const { feepayer } = this.settlementModule.config;
let { nonce } = options;

Expand Down Expand Up @@ -364,9 +363,27 @@ export class BridgingModule extends SequencerModule {
this.getBridgingModuleConfig().withdrawalStatePath.split(".");
const basePath = Path.fromProperty(withdrawalModule, withdrawalStateName);

// TODO Not sure if we should re-fetch the account state here
const outgoingMessageCursor = parseInt(
bridgeContract.outgoingMessageCursor.get().toString(),
10
);

const pendingWithdrawals = await this.outgoingMessageQueue.fetchWithdrawals(
tokenId,
outgoingMessageCursor
);

// Create withdrawal batches and send them as L1 transactions
for (let i = 0; i < length; i += OUTGOING_MESSAGE_BATCH_SIZE) {
const batch = this.outgoingMessageQueue.peek(OUTGOING_MESSAGE_BATCH_SIZE);
for (
let i = 0;
i < pendingWithdrawals.length;
i += OUTGOING_MESSAGE_BATCH_SIZE
) {
const batch = pendingWithdrawals.slice(
i,
i + OUTGOING_MESSAGE_BATCH_SIZE
);

const keys = batch.map((x) =>
Path.fromKey(basePath, OutgoingMessageKey, {
Expand Down Expand Up @@ -427,8 +444,6 @@ export class BridgingModule extends SequencerModule {
"included"
);

this.outgoingMessageQueue.pop(OUTGOING_MESSAGE_BATCH_SIZE);

txs.push({
tx: signedTx,
});
Expand Down
149 changes: 86 additions & 63 deletions packages/sequencer/src/settlement/messages/WithdrawalQueue.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { inject, injectable } from "tsyringe";
import { Withdrawal } from "@proto-kit/protocol";
import { Field, Struct } from "o1js";
import { splitArray } from "@proto-kit/common";

import type { BlockTriggerBase } from "../../protocol/production/trigger/BlockTrigger";
import { SettlementModule } from "../SettlementModule";
import { SequencerModule } from "../../sequencer/builder/SequencerModule";
import { Sequencer } from "../../sequencer/executor/Sequencer";
import { Block } from "../../storage/model/Block";
import { BridgingModule } from "../BridgingModule";
import {
BlockStorage,
HistoricalBlockStorage,
} from "../../storage/repositories/BlockStorage";
import { SettlementStorage } from "../../storage/repositories/SettlementStorage";
import { HistoricalBatchStorage } from "../../storage/repositories/BatchStorage";

export interface OutgoingMessage<Type> {
index: number;
Expand All @@ -34,99 +39,117 @@ export class WithdrawalEvent extends Struct({
* In the future, this interface should be flexibly typed so that the
* outgoing message type is not limited to Withdrawals
*/
export interface OutgoingMessageQueue {
peek: (num: number) => OutgoingMessage<Withdrawal>[];
pop: (num: number) => OutgoingMessage<Withdrawal>[];
length: () => number;
export interface OutgoingMessageAdapter {
fetchWithdrawals(
tokenId: Field,
offset: number
): Promise<OutgoingMessage<Withdrawal>[]>;
}

@injectable()
export class WithdrawalQueue
extends SequencerModule
implements OutgoingMessageQueue
implements OutgoingMessageAdapter
{
private lockedQueue: Block[] = [];

private unlockedQueue: OutgoingMessage<Withdrawal>[] = [];

private outgoingWithdrawalEvents: string[] = [];

public constructor(
@inject("Sequencer")
private readonly sequencer: Sequencer<{
BlockTrigger: typeof BlockTriggerBase;
SettlementModule: typeof SettlementModule;
}>
}>,
@inject("BlockStorage")
private readonly blockStorage: BlockStorage & HistoricalBlockStorage,
@inject("BatchStorage")
private readonly batchStorage: HistoricalBatchStorage,
@inject("SettlementStorage")
private readonly settlementStorage: SettlementStorage
) {
super();
}

public peek(num: number): OutgoingMessage<Withdrawal>[] {
return this.unlockedQueue.slice(0, num);
private extractEventsFromBlock(block: Block) {
return block.transactions.flatMap((result) =>
result.events
.filter((event) =>
this.outgoingWithdrawalEvents.includes(event.eventName)
)
.map((event) => WithdrawalEvent.fromFields(event.data))
);
}

public pop(num: number): OutgoingMessage<Withdrawal>[] {
const slice = this.peek(num);
this.unlockedQueue = this.unlockedQueue.slice(num);
return slice;
// TODO Not really efficient right now in regards to DB trips, can be
// easily built as a join query though
private async getLatestSettledBlock(): Promise<Block | undefined> {
const settlement = await this.settlementStorage.getLatestSettlement();
if (settlement !== undefined) {
const lastBatch = settlement.batches.at(-1);
if (lastBatch !== undefined) {
const batch = await this.batchStorage.getBatchAt(lastBatch);
if (batch !== undefined) {
const blockHash = batch.blockHashes.at(-1);
if (blockHash !== undefined) {
return await this.blockStorage.getBlock(blockHash);
}
}
}
}
return undefined;
}

public length() {
return this.unlockedQueue.length;
private async findBlockWithEvent(tokenId: Field, index: number) {
let block = await this.getLatestSettledBlock();

// Casting to defined here is fine, bcs in all cases where that could be undefined,
// we break and return undefined all together.
const blockHistory = [block!];

while (block !== undefined) {
const events = this.extractEventsFromBlock(block);
const found = events.find((withdrawalEvent) => {
return withdrawalEvent.key.tokenId
.equals(tokenId)
.and(withdrawalEvent.key.index.equals(index))
.toBoolean();
});
if (found !== undefined) {
return blockHistory.reverse();
}
if (block.previousBlockHash !== undefined) {
// eslint-disable-next-line no-await-in-loop
block = await this.blockStorage.getBlock(
block.previousBlockHash.toString()
);
blockHistory.push(block!);
}
}
return undefined;
}

public async fetchWithdrawals(
tokenId: Field,
offset: number
): Promise<OutgoingMessage<Withdrawal>[]> {
const blocks = await this.findBlockWithEvent(tokenId, offset);

const events = blocks
?.flatMap((block) => this.extractEventsFromBlock(block))
?.map((event) => ({
index: parseInt(event.key.index.toString(), 10),
value: event.value,
}));
return events ?? [];
}

public async start(): Promise<void> {
// Hacky workaround for this cyclic dependency
const settlementModule = this.sequencer.resolveOrFail(
"SettlementModule",
SettlementModule
);
const bridgingModule = this.sequencer.resolveOrFail(
"BridgingModule",
BridgingModule
);

const { withdrawalEventName } = bridgingModule.getBridgingModuleConfig();
this.outgoingWithdrawalEvents = [withdrawalEventName];

this.sequencer.events.on("block-produced", (block) => {
this.lockedQueue.push(block);
});

// TODO Add event settlement-included and register it there
settlementModule.events.on("settlement-submitted", (batch) => {
// This finds out which blocks are contained in this batch and extracts only from those
const { inBatch, notInBatch } = splitArray(this.lockedQueue, (block) =>
batch.blockHashes.includes(block.hash.toString())
? "inBatch"
: "notInBatch"
);

const withdrawals = (inBatch ?? []).flatMap((block) => {
return block.transactions
.flatMap((tx) =>
tx.events
.filter(
(event) => event.eventName === this.outgoingWithdrawalEvents[0]
)
.map((event) => {
return {
tx,
event,
};
})
)
.map<OutgoingMessage<Withdrawal>>(({ tx, event }) => {
const withdrawalEvent = WithdrawalEvent.fromFields(event.data);

return {
index: Number(withdrawalEvent.key.index.toString()),
value: withdrawalEvent.value,
};
});
});
this.unlockedQueue.push(...withdrawals);
this.lockedQueue = notInBatch ?? [];
});
}
}