Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions packages/persistance/src/services/prisma/PrismaMessageStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,41 @@ export class PrismaMessageStorage implements MessageStorage {
private readonly transactionMapper: TransactionMapper
) {}

public async getMessages(
fromMessageHash: string
): Promise<PendingTransaction[]> {
public async getMessageBatches(
fromMessagesHash: string,
toMessagesHash: string
) {
// TODO Make efficient

const batches: {
fromMessagesHash: string;
toMessagesHash: string;
messages: PendingTransaction[];
}[] = [];
let currentHash = fromMessagesHash;

while (currentHash !== toMessagesHash) {
// eslint-disable-next-line no-await-in-loop
const batch = await this.getNextMessagesBatch(currentHash);

if (batch === undefined) {
return batches;
}

batches.push(batch);
currentHash = batch.toMessagesHash;
}
return batches;
}

public async getNextMessagesBatch(fromMessageHash: string): Promise<
| {
fromMessagesHash: string;
toMessagesHash: string;
messages: PendingTransaction[];
}
| undefined
> {
const { prismaClient } = this.connection;

const batch = await prismaClient.incomingMessageBatch.findFirst({
Expand All @@ -31,14 +63,22 @@ export class PrismaMessageStorage implements MessageStorage {
});

if (batch === null) {
return [];
return undefined;
}

const dbTransactions = batch.messages.map((message) => {
return message.transaction;
});

return dbTransactions.map((dbTx) => this.transactionMapper.mapIn(dbTx));
const messages = dbTransactions.map((dbTx) =>
this.transactionMapper.mapIn(dbTx)
);

return {
fromMessagesHash: fromMessageHash,
toMessagesHash: batch.toMessageHash,
messages,
};
}

public async pushMessages(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Settlement, SettlementStorage } from "@proto-kit/sequencer";
import { inject, injectable } from "tsyringe";
import { Prisma } from "@prisma/client";

import type { PrismaConnection } from "../../PrismaDatabaseConnection";

Expand All @@ -12,6 +13,31 @@ export class PrismaSettlementStorage implements SettlementStorage {
private readonly settlementMapper: SettlementMapper
) {}

public async getLatestSettlement(): Promise<Settlement | undefined> {
const { prismaClient } = this.connection;

const batch = await prismaClient.batch.findFirst({
where: {
settlementTransactionHash: {
not: null,
},
},
orderBy: [
{
height: Prisma.SortOrder.desc,
},
],
include: {
settlement: true,
},
});

if (batch !== null) {
return this.settlementMapper.mapIn([batch.settlement!, []]);
}
return undefined;
}

public async pushSettlement(settlement: Settlement): Promise<void> {
const { prismaClient } = this.connection;

Expand Down
2 changes: 1 addition & 1 deletion packages/sequencer/src/protocol/baselayer/NoopBaseLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import { BaseLayer, BaseLayerDependencyRecord } from "./BaseLayer";

class NoopIncomingMessageAdapter implements IncomingMessageAdapter {
async getPendingMessages(
async fetchPendingMessages(
address: PublicKey,
params: {
fromActionHash: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { inject } from "tsyringe";
import { log } from "@proto-kit/common";
import { ACTIONS_EMPTY_HASH } from "@proto-kit/protocol";
import {
MethodIdResolver,
MethodParameterEncoder,
Expand All @@ -23,8 +22,8 @@ import {
BlockResult,
BlockWithResult,
} from "../../../storage/model/Block";
import { MessageStorage } from "../../../storage/repositories/MessageStorage";
import { Database } from "../../../storage/Database";
import { IncomingMessagesService } from "../../../settlement/messages/IncomingMessagesService";

import { BlockProductionService } from "./BlockProductionService";
import { BlockResultService } from "./BlockResultService";
Expand All @@ -40,7 +39,7 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {

public constructor(
@inject("Mempool") private readonly mempool: Mempool,
@inject("MessageStorage") private readonly messageStorage: MessageStorage,
private readonly messageService: IncomingMessagesService,
@inject("UnprovenStateService")
private readonly unprovenStateService: AsyncStateService,
@inject("UnprovenMerkleStore")
Expand Down Expand Up @@ -186,10 +185,7 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
};
}

const messages = await this.messageStorage.getMessages(
parentBlock?.block.toMessagesHash.toString() ??
ACTIONS_EMPTY_HASH.toString()
);
const messages = await this.messageService.getPendingMessages();

log.debug(
`Block collected, ${txs.length} txs, ${messages.length} messages`
Expand Down
13 changes: 2 additions & 11 deletions packages/sequencer/src/protocol/production/trigger/BlockTrigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import { BlockQueue } from "../../../storage/repositories/BlockStorage";
import { SequencerModule } from "../../../sequencer/builder/SequencerModule";
import { SettlementModule } from "../../../settlement/SettlementModule";
import { Block, BlockWithResult } from "../../../storage/model/Block";
import { SettlementStorage } from "../../../storage/repositories/SettlementStorage";

/**
* A BlockTrigger is the primary method to start the production of a block and
Expand Down Expand Up @@ -44,8 +43,7 @@ export class BlockTriggerBase<
protected readonly blockProducerModule: BlockProducerModule,
protected readonly batchProducerModule: BatchProducerModule | undefined,
protected readonly settlementModule: SettlementModule | undefined,
protected readonly blockQueue: BlockQueue,
protected readonly settlementStorage: SettlementStorage | undefined
protected readonly blockQueue: BlockQueue
) {
super();
}
Expand Down Expand Up @@ -96,14 +94,7 @@ export class BlockTriggerBase<
);
return undefined;
}
if (this.settlementStorage === undefined) {
throw new Error(
"SettlementStorage module not configured, check provided database moduel"
);
}
const settlement = await this.settlementModule.settleBatch(batch);
await this.settlementStorage.pushSettlement(settlement);
return settlement;
return await this.settlementModule.settleBatch(batch);
}

public async start(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { BlockProducerModule } from "../sequencing/BlockProducerModule";
import { Block, BlockWithResult } from "../../../storage/model/Block";
import { BlockQueue } from "../../../storage/repositories/BlockStorage";
import { SettlementModule } from "../../../settlement/SettlementModule";
import { SettlementStorage } from "../../../storage/repositories/SettlementStorage";

import { BlockTrigger, BlockTriggerBase } from "./BlockTrigger";

Expand All @@ -25,17 +24,14 @@ export class ManualBlockTrigger
@injectOptional("SettlementModule")
settlementModule: SettlementModule | undefined,
@inject("BlockQueue")
blockQueue: BlockQueue,
@injectOptional("SettlementStorage")
settlementStorage: SettlementStorage | undefined
blockQueue: BlockQueue
) {
super(
blockProducerModule,
batchProducerModule,
settlementModule,

blockQueue,
settlementStorage
blockQueue
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { Mempool } from "../../../mempool/Mempool";
import { BlockQueue } from "../../../storage/repositories/BlockStorage";
import { BlockProducerModule } from "../sequencing/BlockProducerModule";
import { SettlementModule } from "../../../settlement/SettlementModule";
import { SettlementStorage } from "../../../storage/repositories/SettlementStorage";

import { BlockEvents, BlockTrigger, BlockTriggerBase } from "./BlockTrigger";

Expand Down Expand Up @@ -49,17 +48,14 @@ export class TimedBlockTrigger
settlementModule: SettlementModule | undefined,
@inject("BlockQueue")
blockQueue: BlockQueue,
@injectOptional("SettlementStorage")
settlementStorage: SettlementStorage | undefined,
@inject("Mempool")
private readonly mempool: Mempool
) {
super(
blockProducerModule,
batchProducerModule,
settlementModule,
blockQueue,
settlementStorage
blockQueue
);
}

Expand Down
45 changes: 13 additions & 32 deletions packages/sequencer/src/settlement/SettlementModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import {
SettlementSmartContract,
MandatorySettlementModulesRecord,
MandatoryProtocolModulesRecord,
BlockProverPublicOutput,
SettlementSmartContractBase,
DynamicBlockProof,
} from "@proto-kit/protocol";
import {
AccountUpdate,
Field,
Mina,
PrivateKey,
PublicKey,
Expand All @@ -35,15 +33,14 @@ import {
SequencerModule,
sequencerModule,
} from "../sequencer/builder/SequencerModule";
import { MessageStorage } from "../storage/repositories/MessageStorage";
import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer";
import { Batch, SettleableBatch } from "../storage/model/Batch";
import { BlockProofSerializer } from "../protocol/production/tasks/serializers/BlockProofSerializer";
import { Settlement } from "../storage/model/Settlement";
import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy";
import { SettlementStartupModule } from "../sequencer/SettlementStartupModule";
import { SettlementStorage } from "../storage/repositories/SettlementStorage";

import { IncomingMessageAdapter } from "./messages/IncomingMessageAdapter";
import { MinaTransactionSender } from "./transactions/MinaTransactionSender";
import { ProvenSettlementPermissions } from "./permissions/ProvenSettlementPermissions";
import { SignedSettlementPermissions } from "./permissions/SignedSettlementPermissions";
Expand Down Expand Up @@ -88,10 +85,8 @@ export class SettlementModule
@inject("BaseLayer") baseLayer: MinaBaseLayer,
@inject("Protocol")
private readonly protocol: Protocol<MandatoryProtocolModulesRecord>,
@inject("IncomingMessageAdapter")
private readonly incomingMessagesAdapter: IncomingMessageAdapter,
@inject("MessageStorage")
private readonly messageStorage: MessageStorage,
@inject("SettlementStorage")
private readonly settlementStorage: SettlementStorage,
private readonly blockProofSerializer: BlockProofSerializer,
@inject("TransactionSender")
private readonly transactionSender: MinaTransactionSender,
Expand Down Expand Up @@ -169,38 +164,20 @@ export class SettlementModule
} = {}
): Promise<Settlement> {
await this.fetchContractAccounts();
const { settlement, dispatch } = this.getContracts();
const { settlement: settlementContract, dispatch } = this.getContracts();
const { feepayer } = this.config;

log.debug("Preparing settlement");

const lastSettlementL1BlockHeight =
settlement.lastSettlementL1BlockHeight.get().value;
settlementContract.lastSettlementL1BlockHeight.get().value;
const signature = Signature.create(feepayer, [
BATCH_SIGNATURE_PREFIX,
lastSettlementL1BlockHeight,
]);

const fromSequenceStateHash = BlockProverPublicOutput.fromFields(
batch.proof.publicOutput.map((x) => Field(x))
).incomingMessagesHash;
const latestSequenceStateHash = dispatch.account.actionState.get();

// Fetch actions and store them into the messageStorage
const actions = await this.incomingMessagesAdapter.getPendingMessages(
dispatch.address,
{
fromActionHash: fromSequenceStateHash.toString(),
toActionHash: latestSequenceStateHash.toString(),
fromL1BlockHeight: Number(lastSettlementL1BlockHeight.toString()),
}
);
await this.messageStorage.pushMessages(
actions.from,
actions.to,
actions.messages
);

const blockProof = await this.blockProofSerializer
.getBlockProofSerializer()
.fromJSONProof(batch.proof);
Expand All @@ -215,7 +192,7 @@ export class SettlementModule
memo: "Protokit settle",
},
async () => {
await settlement.settle(
await settlementContract.settle(
dynamicBlockProof,
signature,
dispatch.address,
Expand All @@ -233,12 +210,16 @@ export class SettlementModule

log.info("Settlement transaction send queued");

this.events.emit("settlement-submitted", batch);

return {
const settlement = {
batches: [batch.height],
promisedMessagesHash: latestSequenceStateHash.toString(),
};

await this.settlementStorage.pushSettlement(settlement);

this.events.emit("settlement-submitted", batch);

return settlement;
}

public async deploy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { PendingTransaction } from "../../mempool/PendingTransaction";
* (Dispatched Deposit Actions for example)
*/
export interface IncomingMessageAdapter {
getPendingMessages: (
fetchPendingMessages: (
address: PublicKey,
params: {
fromActionHash: string;
Expand Down
Loading