Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { sha256 } from "@cosmjs/crypto";
import { toHex } from "@cosmjs/encoding";
import { Registry } from "@cosmjs/proto-signing";
import type { Account } from "@cosmjs/stargate";
import { TxRaw } from "cosmjs-types/cosmos/tx/v1beta1/tx";
import { mock } from "jest-mock-extended";

Expand Down Expand Up @@ -28,6 +29,150 @@ describe("BatchSigningClientService", () => {
expect(result[0]?.hash).toBe(expectedHash);
});

it("should cache first address and prevent race conditions", async () => {
const { service, mockWallet } = setup();

let resolveAddress: (value: string) => void;
const addressPromise = new Promise<string>(resolve => {
resolveAddress = resolve;
});
mockWallet.getFirstAddress.mockReturnValue(addressPromise);

const promises = [service["getCachedFirstAddress"](), service["getCachedFirstAddress"](), service["getCachedFirstAddress"]()];

setTimeout(() => resolveAddress!("akash1testaddress"), 10);

const results = await Promise.all(promises);

expect(results).toEqual(["akash1testaddress", "akash1testaddress", "akash1testaddress"]);
expect(mockWallet.getFirstAddress).toHaveBeenCalledTimes(1);
});

it("should return cached first address on subsequent calls", async () => {
const { service, mockWallet } = setup();
mockWallet.getFirstAddress.mockResolvedValue("akash1testaddress");

const firstResult = await service["getCachedFirstAddress"]();
expect(firstResult).toBe("akash1testaddress");
expect(mockWallet.getFirstAddress).toHaveBeenCalledTimes(1);

const secondResult = await service["getCachedFirstAddress"]();
expect(secondResult).toBe("akash1testaddress");
expect(mockWallet.getFirstAddress).toHaveBeenCalledTimes(1);
});

it("should cache account info and prevent race conditions", async () => {
const { service, mockClient } = setup();

let resolveAccount: (value: Account) => void;
const accountPromise = new Promise<Account>(resolve => {
resolveAccount = resolve;
});
mockClient.getAccount.mockReturnValue(accountPromise);

const promises = [service["getCachedAccountInfo"](), service["getCachedAccountInfo"](), service["getCachedAccountInfo"]()];

setTimeout(
() =>
resolveAccount!({
address: "akash1testaddress",
pubkey: null,
accountNumber: 1,
sequence: 1
}),
10
);

const results = await Promise.all(promises);

expect(results).toHaveLength(3);
expect(results[0]).toEqual(results[1]);
expect(results[1]).toEqual(results[2]);
expect(results[0]).toMatchObject({
accountNumber: 1,
sequence: 1,
address: "akash1testaddress"
});
expect(mockClient.getAccount).toHaveBeenCalledTimes(1);
});

it("should return cached account info on subsequent calls", async () => {
const { service, mockClient } = setup();

const firstResult = await service["getCachedAccountInfo"]();
expect(firstResult).toMatchObject({
accountNumber: 1,
sequence: 1,
address: "akash1testaddress"
});
expect(mockClient.getAccount).toHaveBeenCalledTimes(1);

const secondResult = await service["getCachedAccountInfo"]();
expect(secondResult).toEqual(firstResult);
expect(mockClient.getAccount).toHaveBeenCalledTimes(1);
});

it("should clear cached account info when clearCachedAccountInfo is called", async () => {
const { service, mockClient } = setup();

await service["getCachedAccountInfo"]();
expect(mockClient.getAccount).toHaveBeenCalledTimes(1);

service["clearCachedAccountInfo"]();

await service["getCachedAccountInfo"]();
expect(mockClient.getAccount).toHaveBeenCalledTimes(2);
});

it("should increment sequence correctly", async () => {
const { service } = setup();

const initialInfo = await service["getCachedAccountInfo"]();
expect(initialInfo.sequence).toBe(1);

service["incrementSequence"]();
const updatedInfo = await service["getCachedAccountInfo"]();
expect(updatedInfo.sequence).toBe(2);

service["incrementSequence"]();
const finalInfo = await service["getCachedAccountInfo"]();
expect(finalInfo.sequence).toBe(3);
});

it("should handle errors in first address fetching and clear promise", async () => {
const { service, mockWallet } = setup();

mockWallet.getFirstAddress.mockRejectedValue(new Error("Network error"));

await expect(service["getCachedFirstAddress"]()).rejects.toThrow("Network error");

await expect(service["getCachedFirstAddress"]()).rejects.toThrow("Network error");

expect(mockWallet.getFirstAddress).toHaveBeenCalledTimes(2);
});

it("should handle errors in account info fetching and clear promise", async () => {
const { service, mockClient } = setup();

mockClient.getAccount.mockRejectedValue(new Error("Account not found"));

await expect(service["getCachedAccountInfo"]()).rejects.toThrow("Account not found");

await expect(service["getCachedAccountInfo"]()).rejects.toThrow("Account not found");

expect(mockClient.getAccount).toHaveBeenCalledTimes(2);
});

it("should handle account not found error correctly", async () => {
const { service, mockClient } = setup();

mockClient.getAccount.mockResolvedValue(null);

await expect(service["getCachedAccountInfo"]()).rejects.toThrow(
"Account not found for address: akash1testaddress. The account may not exist on the blockchain yet."
);
});

function setup() {
const dummyTxRaw = TxRaw.fromPartial({
bodyBytes: new Uint8Array([1, 2, 3]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { withSpan } from "@src/core/services/tracing/tracing.service";
interface ShortAccountInfo {
accountNumber: number;
sequence: number;
address: string;
}

export interface ExecuteTxOptions {
Expand All @@ -40,10 +41,14 @@ export class BatchSigningClientService {

private readonly semaphore = new Sema(1);

private accountInfo?: ShortAccountInfo;

private chainId?: string;

private cachedFirstAddress?: string;
private firstAddressPromise?: Promise<string>;

private cachedAccountInfo?: ShortAccountInfo;
private accountInfoPromise?: Promise<ShortAccountInfo>;

private execTxLoader = new DataLoader(
async (batchedInputs: readonly ExecuteTxInput[]) => {
return this.executeTxBatchBlocking(batchedInputs as ExecuteTxInput[]);
Expand All @@ -53,10 +58,6 @@ export class BatchSigningClientService {

private readonly logger = LoggerService.forContext(this.loggerContext);

get hasPendingTransactions() {
return this.semaphore.nrWaiting() > 0;
}

constructor(
private readonly config: BillingConfigService,
private readonly wallet: Wallet,
Expand All @@ -67,6 +68,18 @@ export class BatchSigningClientService {
this.clientAsPromised = this.initClient();
}

get hasPendingTransactions() {
return this.semaphore.nrWaiting() > 0;
}

async executeTx(messages: readonly EncodeObject[], options?: ExecuteTxOptions) {
const tx = await this.execTxLoader.load({ messages, options });

assert(tx?.code === 0, 500, "Failed to sign and broadcast tx", { data: tx });

return tx;
}

private async initClient() {
return await backOff(
() =>
Expand All @@ -86,12 +99,75 @@ export class BatchSigningClientService {
);
}

async executeTx(messages: readonly EncodeObject[], options?: ExecuteTxOptions) {
const tx = await this.execTxLoader.load({ messages, options });
private async getCachedFirstAddress(): Promise<string> {
if (this.cachedFirstAddress) {
return this.cachedFirstAddress;
}

assert(tx?.code === 0, 500, "Failed to sign and broadcast tx", { data: tx });
if (this.firstAddressPromise) {
return this.firstAddressPromise;
}

return tx;
this.firstAddressPromise = this.wallet
.getFirstAddress()
.then(address => {
this.cachedFirstAddress = address;
this.firstAddressPromise = undefined;
return address;
})
.catch(error => {
this.firstAddressPromise = undefined;
throw error;
});

return this.firstAddressPromise;
}

private async getCachedAccountInfo(): Promise<ShortAccountInfo> {
if (this.cachedAccountInfo) {
return this.cachedAccountInfo;
}

if (this.accountInfoPromise) {
return this.accountInfoPromise;
}

this.accountInfoPromise = this.clientAsPromised
.then(async client => {
const address = await this.getCachedFirstAddress();
const account = await client.getAccount(address);

if (!account) {
throw new Error(`Account not found for address: ${address}. The account may not exist on the blockchain yet.`);
}

const accountInfo = {
accountNumber: account.accountNumber,
sequence: account.sequence,
address: address
};

this.cachedAccountInfo = accountInfo;
this.accountInfoPromise = undefined;
return accountInfo;
})
.catch(error => {
this.accountInfoPromise = undefined;
throw error;
});

return this.accountInfoPromise;
}

private incrementSequence(): void {
if (this.cachedAccountInfo) {
this.cachedAccountInfo.sequence++;
}
}

private clearCachedAccountInfo(): void {
this.cachedAccountInfo = undefined;
this.accountInfoPromise = undefined;
}

private async executeTxBatchBlocking(inputs: ExecuteTxInput[]): Promise<IndexedTx[]> {
Expand All @@ -108,7 +184,8 @@ export class BatchSigningClientService {

if (isSequenceMismatch) {
this.clientAsPromised = this.initClient();
this.logger.warn({ event: "ACCOUNT_SEQUENCE_MISMATCH", address: await this.wallet.getFirstAddress(), attempt });
this.clearCachedAccountInfo();
this.logger.warn({ event: "ACCOUNT_SEQUENCE_MISMATCH", address: await this.getCachedFirstAddress(), attempt });

return true;
}
Expand All @@ -124,22 +201,22 @@ export class BatchSigningClientService {
private async executeTxBatch(inputs: ExecuteTxInput[]): Promise<IndexedTx[]> {
return await withSpan("BatchSigningClientService.executeTxBatch", async () => {
const client = await this.clientAsPromised;
const accountInfo = await this.updateAccountInfo();
const address = await this.wallet.getFirstAddress();
const accountInfo = await this.getCachedAccountInfo();

const txes: TxRaw[] = [];
let txIndex: number = 0;
while (txIndex < inputs.length) {
const { messages, options } = inputs[txIndex];
const fee = await this.estimateFee(messages, this.FEES_DENOM, options?.fee.granter);
const fee = await this.estimateFee(messages, this.FEES_DENOM, options?.fee?.granter);

txes.push(
await client.sign(address, messages, fee, "", {
await client.sign(accountInfo.address, messages, fee, "", {
accountNumber: accountInfo.accountNumber,
sequence: accountInfo.sequence++,
sequence: accountInfo.sequence,
chainId: this.chainId!
})
);

this.incrementSequence();
txIndex++;
}

Expand Down Expand Up @@ -191,16 +268,6 @@ export class BatchSigningClientService {
});
}

private async updateAccountInfo() {
const client = await this.clientAsPromised;
const accountInfo = await client.getAccount(await this.wallet.getFirstAddress()).then(account => ({
accountNumber: account!.accountNumber,
sequence: account!.sequence
}));
this.accountInfo = accountInfo;
return accountInfo;
}

private async estimateFee(messages: readonly EncodeObject[], denom: string, granter?: string, options?: { mock?: boolean }) {
if (options?.mock) {
return {
Expand All @@ -219,6 +286,6 @@ export class BatchSigningClientService {
}

private async simulate(messages: readonly EncodeObject[], memo: string) {
return (await this.clientAsPromised).simulate(await this.wallet.getFirstAddress(), messages, memo);
return (await this.clientAsPromised).simulate(await this.getCachedFirstAddress(), messages, memo);
}
}