From ac39c9ab05e725d5d8cd7ed3c060807307a24526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Fri, 30 Jan 2026 01:25:27 +0100 Subject: [PATCH 1/9] Refactor encryption to use workers --- packages/sdk/jest.config.ts | 1 + packages/sdk/rollup.config.mts | 1 + .../src/_browser/createEncryptionWorker.ts | 11 + .../sdk/src/_jest/createEncryptionWorker.ts | 12 + .../sdk/src/_karma/createEncryptionWorker.ts | 12 + .../sdk/src/_nodejs/createEncryptionWorker.ts | 11 + .../sdk/src/encryption/EncryptionService.ts | 138 ++++++++++++ packages/sdk/src/encryption/EncryptionUtil.ts | 56 +---- .../sdk/src/encryption/EncryptionWorker.ts | 97 ++++++++ packages/sdk/src/encryption/GroupKey.ts | 25 +- packages/sdk/src/encryption/aesUtils.ts | 28 +++ packages/sdk/src/encryption/decrypt.ts | 20 +- .../sdk/src/encryption/encryptionUtils.ts | 113 ++++++++++ packages/sdk/src/publish/MessageFactory.ts | 9 +- packages/sdk/src/publish/Publisher.ts | 5 + .../src/subscribe/MessagePipelineFactory.ts | 6 + packages/sdk/src/subscribe/messagePipeline.ts | 4 +- packages/sdk/test/integration/Resends.test.ts | 3 +- .../sdk/test/integration/gap-fill.test.ts | 10 +- .../integration/parallel-key-exchange.test.ts | 3 +- .../test/test-utils/fake/FakeEnvironment.ts | 4 + packages/sdk/test/test-utils/utils.ts | 45 +++- packages/sdk/test/unit/Decrypt.test.ts | 5 +- .../sdk/test/unit/EncryptionService.test.ts | 213 ++++++++++++++++++ packages/sdk/test/unit/EncryptionUtil.test.ts | 75 +----- packages/sdk/test/unit/MessageFactory.test.ts | 7 +- packages/sdk/test/unit/Publisher.test.ts | 3 +- packages/sdk/test/unit/aesUtils.test.ts | 80 +++++++ .../sdk/test/unit/messagePipeline.test.ts | 7 +- .../sdk/test/unit/resendSubscription.test.ts | 10 +- 30 files changed, 856 insertions(+), 158 deletions(-) create mode 100644 packages/sdk/src/_browser/createEncryptionWorker.ts create mode 100644 packages/sdk/src/_jest/createEncryptionWorker.ts create mode 100644 packages/sdk/src/_karma/createEncryptionWorker.ts create mode 100644 packages/sdk/src/_nodejs/createEncryptionWorker.ts create mode 100644 packages/sdk/src/encryption/EncryptionService.ts create mode 100644 packages/sdk/src/encryption/EncryptionWorker.ts create mode 100644 packages/sdk/src/encryption/aesUtils.ts create mode 100644 packages/sdk/src/encryption/encryptionUtils.ts create mode 100644 packages/sdk/test/unit/EncryptionService.test.ts create mode 100644 packages/sdk/test/unit/aesUtils.test.ts diff --git a/packages/sdk/jest.config.ts b/packages/sdk/jest.config.ts index 9664a4931f..0dba338d41 100644 --- a/packages/sdk/jest.config.ts +++ b/packages/sdk/jest.config.ts @@ -13,6 +13,7 @@ const config: Config.InitialOptions = { moduleNameMapper: { "^@/createSignatureValidationWorker$": "/src/_jest/createSignatureValidationWorker.ts", "^@/createSigningWorker$": "/src/_jest/createSigningWorker.ts", + "^@/createEncryptionWorker$": "/src/_jest/createEncryptionWorker.ts", "^@/(.*)$": "/src/_nodejs/$1", }, transform: { diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index 35efbadc6e..671ebc24de 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -36,6 +36,7 @@ const browserAliases: Alias[] = [ const WORKERS: Record = { 'SignatureValidationWorker': 'signature/SignatureValidationWorker', 'SigningWorker': 'signature/SigningWorker', + 'EncryptionWorker': 'encryption/EncryptionWorker', } export default defineConfig([ diff --git a/packages/sdk/src/_browser/createEncryptionWorker.ts b/packages/sdk/src/_browser/createEncryptionWorker.ts new file mode 100644 index 0000000000..c24bed64a7 --- /dev/null +++ b/packages/sdk/src/_browser/createEncryptionWorker.ts @@ -0,0 +1,11 @@ +/** + * Browser-specific encryption worker factory. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('./workers/EncryptionWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_jest/createEncryptionWorker.ts b/packages/sdk/src/_jest/createEncryptionWorker.ts new file mode 100644 index 0000000000..bd9b168e0b --- /dev/null +++ b/packages/sdk/src/_jest/createEncryptionWorker.ts @@ -0,0 +1,12 @@ +/** + * Jest-specific encryption worker factory. + * Points to the built worker in dist/ for testing. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/EncryptionWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_karma/createEncryptionWorker.ts b/packages/sdk/src/_karma/createEncryptionWorker.ts new file mode 100644 index 0000000000..cba587f86b --- /dev/null +++ b/packages/sdk/src/_karma/createEncryptionWorker.ts @@ -0,0 +1,12 @@ +/** + * Karma-specific encryption worker factory. + * Points to the built worker in dist/ for browser testing. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/EncryptionWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_nodejs/createEncryptionWorker.ts b/packages/sdk/src/_nodejs/createEncryptionWorker.ts new file mode 100644 index 0000000000..39a105676f --- /dev/null +++ b/packages/sdk/src/_nodejs/createEncryptionWorker.ts @@ -0,0 +1,11 @@ +/** + * Node.js-specific encryption worker factory. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('./workers/EncryptionWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/encryption/EncryptionService.ts b/packages/sdk/src/encryption/EncryptionService.ts new file mode 100644 index 0000000000..ba49a40489 --- /dev/null +++ b/packages/sdk/src/encryption/EncryptionService.ts @@ -0,0 +1,138 @@ +/** + * Singleton encryption service using Web Worker. + * This offloads CPU-intensive AES encryption operations to a separate thread. + * Works in both browser and Node.js environments via platform-specific config. + * + * The worker is lazily initialized on first use and shared across all consumers. + */ +import { wrap, releaseProxy, transfer, type Remote } from 'comlink' +import { Lifecycle, scoped } from 'tsyringe' +import { EncryptedGroupKey } from '@streamr/trackerless-network' +import { createEncryptionWorker } from '@/createEncryptionWorker' +import type { EncryptionWorkerApi } from './EncryptionWorker' +import { DestroySignal } from '../DestroySignal' +import { StreamrClientError } from '../StreamrClientError' +import { GroupKey } from './GroupKey' + +@scoped(Lifecycle.ContainerScoped) +export class EncryptionService { + private worker: ReturnType | undefined + private workerApi: Remote | undefined + + constructor(destroySignal: DestroySignal) { + destroySignal.onDestroy.listen(() => this.destroy()) + } + + private getWorkerApi(): Remote { + if (this.workerApi === undefined) { + this.worker = createEncryptionWorker() + this.workerApi = wrap(this.worker) + } + return this.workerApi + } + + /** + * Encrypt data using AES-256-CTR. + * Note: The input data buffer is transferred to the worker and becomes unusable after this call. + */ + async encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Promise { + const result = await this.getWorkerApi().encrypt( + transfer({ data, cipherKey }, [data.buffer]) + ) + if (result.type === 'error') { + throw new Error(`AES encryption failed: ${result.message}`) + } + return result.data + } + + /** + * Decrypt AES-256-CTR encrypted data. + * Note: The input cipher buffer is transferred to the worker and becomes unusable after this call. + */ + async decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Promise { + const result = await this.getWorkerApi().decrypt( + transfer({ cipher, cipherKey }, [cipher.buffer]) + ) + if (result.type === 'error') { + throw new Error(`AES decryption failed: ${result.message}`) + } + return result.data + } + + /** + * Encrypt the next group key using the current group key. + */ + async encryptNextGroupKey(currentKey: GroupKey, nextKey: GroupKey): Promise { + const result = await this.getWorkerApi().encryptGroupKey({ + nextGroupKeyId: nextKey.id, + nextGroupKeyData: nextKey.data, + currentGroupKeyData: currentKey.data + }) + if (result.type === 'error') { + throw new Error(`Group key encryption failed: ${result.message}`) + } + return { + id: result.id, + data: result.data + } + } + + /** + * Decrypt an encrypted group key using the current group key. + */ + async decryptNextGroupKey(currentKey: GroupKey, encryptedKey: EncryptedGroupKey): Promise { + const result = await this.getWorkerApi().decryptGroupKey({ + encryptedGroupKeyId: encryptedKey.id, + encryptedGroupKeyData: encryptedKey.data, + currentGroupKeyData: currentKey.data + }) + if (result.type === 'error') { + throw new Error(`Group key decryption failed: ${result.message}`) + } + return new GroupKey(result.id, Buffer.from(result.data)) + } + + /** + * Decrypt a stream message's content and optionally the new group key. + * This combines both operations for efficiency when processing messages. + * Note: The input content buffer is transferred to the worker and becomes unusable after this call. + */ + async decryptStreamMessage( + content: Uint8Array, + groupKey: GroupKey, + encryptedNewGroupKey?: EncryptedGroupKey + ): Promise<[Uint8Array, GroupKey?]> { + const request = { + content, + groupKeyData: groupKey.data, + newGroupKey: encryptedNewGroupKey ? { + id: encryptedNewGroupKey.id, + data: encryptedNewGroupKey.data + } : undefined + } + const result = await this.getWorkerApi().decryptStreamMessage( + transfer(request, [content.buffer]) + ) + if (result.type === 'error') { + throw new StreamrClientError(`AES decryption failed: ${result.message}`, 'DECRYPT_ERROR') + } + + let newGroupKey: GroupKey | undefined + if (result.newGroupKey) { + newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + } + + return [result.content, newGroupKey] + } + + destroy(): void { + if (this.workerApi !== undefined) { + this.workerApi[releaseProxy]() + this.workerApi = undefined + } + if (this.worker !== undefined) { + this.worker.terminate() + this.worker = undefined + } + } +} diff --git a/packages/sdk/src/encryption/EncryptionUtil.ts b/packages/sdk/src/encryption/EncryptionUtil.ts index b0bb87f5e2..b7bc0d893f 100644 --- a/packages/sdk/src/encryption/EncryptionUtil.ts +++ b/packages/sdk/src/encryption/EncryptionUtil.ts @@ -1,17 +1,19 @@ import { ml_kem1024 } from '@noble/post-quantum/ml-kem' import { randomBytes } from '@noble/post-quantum/utils' -import { StreamMessageAESEncrypted } from '../protocol/StreamMessage' -import { StreamrClientError } from '../StreamrClientError' -import { GroupKey } from './GroupKey' import { AsymmetricEncryptionType } from '@streamr/trackerless-network' -import { binaryToUtf8, createCipheriv, createDecipheriv, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils' - -export const INITIALIZATION_VECTOR_LENGTH = 16 +import { binaryToUtf8, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils' +import { decryptWithAES, encryptWithAES } from './aesUtils' const INFO = Buffer.from('streamr-key-exchange') const KEM_CIPHER_LENGTH_BYTES = 1568 const KDF_SALT_LENGTH_BYTES = 64 +/** + * Asymmetric encryption utility class for RSA and ML-KEM (post-quantum) key exchange. + * + * For AES symmetric encryption of stream messages, use EncryptionService instead. + * This class only handles asymmetric encryption for key exchange operations. + */ // eslint-disable-next-line @typescript-eslint/no-extraneous-class export class EncryptionUtil { /** @@ -116,7 +118,7 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Encrypt plaintext with the AES wrapping key - const aesEncryptedPlaintext = this.encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey)) + const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey)) // Concatenate the deliverables into a binary package return Buffer.concat([kemCipher, kdfSalt, aesEncryptedPlaintext]) @@ -138,44 +140,6 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Decrypt the aesEncryptedPlaintext - return this.decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey)) - } - - /* - * Returns a hex string without the '0x' prefix. - */ - static encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array { - const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode - const cipher = createCipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([iv, cipher.update(data), cipher.final()]) - } - - /* - * 'ciphertext' must be a hex string (without '0x' prefix), 'groupKey' must be a GroupKey. Returns a Buffer. - */ - static decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer { - const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH) - const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()]) - } - - static decryptStreamMessage(streamMessage: StreamMessageAESEncrypted, groupKey: GroupKey): [Uint8Array, GroupKey?] | never { - let content: Uint8Array - try { - content = this.decryptWithAES(streamMessage.content, groupKey.data) - } catch { - throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage) - } - - let newGroupKey: GroupKey | undefined = undefined - if (streamMessage.newGroupKey) { - try { - newGroupKey = groupKey.decryptNextGroupKey(streamMessage.newGroupKey) - } catch { - throw new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', streamMessage) - } - } - - return [content, newGroupKey] + return decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey)) } } diff --git a/packages/sdk/src/encryption/EncryptionWorker.ts b/packages/sdk/src/encryption/EncryptionWorker.ts new file mode 100644 index 0000000000..d70fbcac8f --- /dev/null +++ b/packages/sdk/src/encryption/EncryptionWorker.ts @@ -0,0 +1,97 @@ +/** + * Web Worker for AES encryption operations. + * Offloads CPU-intensive cryptographic operations to a separate thread. + */ +import { expose, transfer } from 'comlink' +import { decryptWithAES, encryptWithAES } from './aesUtils' +import { + encryptNextGroupKey, + decryptNextGroupKey, + decryptStreamMessageContent, + AESEncryptRequest, + AESDecryptRequest, + EncryptGroupKeyRequest, + DecryptGroupKeyRequest, + DecryptStreamMessageRequest, + AESEncryptResult, + AESDecryptResult, + EncryptGroupKeyResult, + DecryptGroupKeyResult, + DecryptStreamMessageResult +} from './encryptionUtils' + +const workerApi = { + encrypt: async (request: AESEncryptRequest): Promise => { + try { + const result = encryptWithAES(request.data, request.cipherKey) + return transfer({ type: 'success', data: result }, [result.buffer]) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + decrypt: async (request: AESDecryptRequest): Promise => { + try { + const result = decryptWithAES(request.cipher, request.cipherKey) + return transfer({ type: 'success', data: result }, [result.buffer]) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + encryptGroupKey: async (request: EncryptGroupKeyRequest): Promise => { + try { + const result = encryptNextGroupKey( + request.nextGroupKeyId, + request.nextGroupKeyData, + request.currentGroupKeyData + ) + return transfer( + { type: 'success', id: result.id, data: result.data }, + [result.data.buffer] + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + decryptGroupKey: async (request: DecryptGroupKeyRequest): Promise => { + try { + const result = decryptNextGroupKey( + request.encryptedGroupKeyId, + request.encryptedGroupKeyData, + request.currentGroupKeyData + ) + return transfer( + { type: 'success', id: result.id, data: result.data }, + [result.data.buffer] + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + decryptStreamMessage: async (request: DecryptStreamMessageRequest): Promise => { + try { + const result = decryptStreamMessageContent( + request.content, + request.groupKeyData, + request.newGroupKey + ) + const transferables: ArrayBuffer[] = [result.content.buffer as ArrayBuffer] + if (result.newGroupKey) { + transferables.push(result.newGroupKey.data.buffer as ArrayBuffer) + } + return transfer( + { type: 'success', content: result.content, newGroupKey: result.newGroupKey }, + transferables + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + } +} + +export type EncryptionWorkerApi = typeof workerApi + +expose(workerApi) diff --git a/packages/sdk/src/encryption/GroupKey.ts b/packages/sdk/src/encryption/GroupKey.ts index 9dda22dbd2..c6d534e874 100644 --- a/packages/sdk/src/encryption/GroupKey.ts +++ b/packages/sdk/src/encryption/GroupKey.ts @@ -1,7 +1,6 @@ -import { EncryptedGroupKey } from '@streamr/trackerless-network' -import { uuid } from '../utils/uuid' -import { EncryptionUtil } from './EncryptionUtil' import { randomBytes } from '@noble/post-quantum/utils' +import { uuid } from '../utils/uuid' + export class GroupKeyError extends Error { public groupKey?: GroupKey @@ -15,6 +14,9 @@ export class GroupKeyError extends Error { /** * GroupKeys are AES cipher keys, which are used to encrypt/decrypt StreamMessages (when encryptionType is AES). * Each group key contains 256 random bits of key data and an UUID. + * + * For encryption/decryption of group keys, use EncryptionService.encryptNextGroupKey() + * and EncryptionService.decryptNextGroupKey(). */ export class GroupKey { @@ -67,21 +69,4 @@ export class GroupKey { const keyBytes = randomBytes(32) return new GroupKey(id, Buffer.from(keyBytes)) } - - /** @internal */ - encryptNextGroupKey(nextGroupKey: GroupKey): EncryptedGroupKey { - return { - id: nextGroupKey.id, - data: EncryptionUtil.encryptWithAES(nextGroupKey.data, this.data) - } - } - - /** @internal */ - decryptNextGroupKey(nextGroupKey: EncryptedGroupKey): GroupKey { - return new GroupKey( - nextGroupKey.id, - EncryptionUtil.decryptWithAES(nextGroupKey.data, this.data) - ) - } - } diff --git a/packages/sdk/src/encryption/aesUtils.ts b/packages/sdk/src/encryption/aesUtils.ts new file mode 100644 index 0000000000..db4786b9a9 --- /dev/null +++ b/packages/sdk/src/encryption/aesUtils.ts @@ -0,0 +1,28 @@ +/** + * Low-level AES-256-CTR encryption utilities. + * Shared between EncryptionUtil (for ML-KEM key wrapping) and encryptionUtils (for stream message encryption). + */ +import { randomBytes } from '@noble/post-quantum/utils' +import { createCipheriv, createDecipheriv } from '@streamr/utils' + +export const INITIALIZATION_VECTOR_LENGTH = 16 + +/** + * Encrypt data using AES-256-CTR. + * Returns IV prepended to ciphertext. + */ +export function encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array { + const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode + const cipher = createCipheriv('aes-256-ctr', cipherKey, iv) + return Buffer.concat([iv, cipher.update(data), cipher.final()]) +} + +/** + * Decrypt AES-256-CTR encrypted data. + * Expects IV prepended to ciphertext. + */ +export function decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer { + const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH) + const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv) + return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()]) +} diff --git a/packages/sdk/src/encryption/decrypt.ts b/packages/sdk/src/encryption/decrypt.ts index 9103cae261..2b434e437b 100644 --- a/packages/sdk/src/encryption/decrypt.ts +++ b/packages/sdk/src/encryption/decrypt.ts @@ -1,8 +1,8 @@ import { EncryptionType } from '@streamr/trackerless-network' import { DestroySignal } from '../DestroySignal' -import { EncryptionUtil } from '../encryption/EncryptionUtil' import { GroupKey } from '../encryption/GroupKey' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage, StreamMessageAESEncrypted } from '../protocol/StreamMessage' import { StreamrClientError } from '../StreamrClientError' @@ -12,6 +12,7 @@ import { StreamrClientError } from '../StreamrClientError' export const decrypt = async ( streamMessage: StreamMessageAESEncrypted, groupKeyManager: GroupKeyManager, + encryptionService: EncryptionService, destroySignal: DestroySignal, ): Promise => { if (destroySignal.isDestroyed()) { @@ -33,7 +34,22 @@ export const decrypt = async ( if (destroySignal.isDestroyed()) { return streamMessage } - const [content, newGroupKey] = EncryptionUtil.decryptStreamMessage(streamMessage, groupKey) + + let content: Uint8Array + let newGroupKey: GroupKey | undefined + try { + [content, newGroupKey] = await encryptionService.decryptStreamMessage( + streamMessage.content, + groupKey, + streamMessage.newGroupKey + ) + } catch (err) { + if (err instanceof StreamrClientError) { + throw new StreamrClientError(err.message, 'DECRYPT_ERROR', streamMessage) + } + throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage) + } + if (newGroupKey !== undefined) { await groupKeyManager.addKeyToLocalStore(newGroupKey, streamMessage.getPublisherId()) } diff --git a/packages/sdk/src/encryption/encryptionUtils.ts b/packages/sdk/src/encryption/encryptionUtils.ts new file mode 100644 index 0000000000..c8c5ab4ec5 --- /dev/null +++ b/packages/sdk/src/encryption/encryptionUtils.ts @@ -0,0 +1,113 @@ +/** + * Higher-level encryption logic - shared between worker and main thread implementations. + * This file contains pure cryptographic functions without any network dependencies. + * + * For low-level AES operations, see aesUtils.ts + */ +import { decryptWithAES, encryptWithAES } from './aesUtils' + +/** + * Request types for worker communication + */ +export interface AESEncryptRequest { + data: Uint8Array + cipherKey: Uint8Array +} + +export interface AESDecryptRequest { + cipher: Uint8Array + cipherKey: Uint8Array +} + +export interface EncryptGroupKeyRequest { + nextGroupKeyId: string + nextGroupKeyData: Uint8Array + currentGroupKeyData: Uint8Array +} + +export interface DecryptGroupKeyRequest { + encryptedGroupKeyId: string + encryptedGroupKeyData: Uint8Array + currentGroupKeyData: Uint8Array +} + +export interface DecryptStreamMessageRequest { + content: Uint8Array + groupKeyData: Uint8Array + newGroupKey?: { + id: string + data: Uint8Array + } +} + +/** + * Result types for worker communication + */ +export type AESEncryptResult = + | { type: 'success', data: Uint8Array } + | { type: 'error', message: string } + +export type AESDecryptResult = + | { type: 'success', data: Uint8Array } + | { type: 'error', message: string } + +export type EncryptGroupKeyResult = + | { type: 'success', id: string, data: Uint8Array } + | { type: 'error', message: string } + +export type DecryptGroupKeyResult = + | { type: 'success', id: string, data: Uint8Array } + | { type: 'error', message: string } + +export type DecryptStreamMessageResult = + | { type: 'success', content: Uint8Array, newGroupKey?: { id: string, data: Uint8Array } } + | { type: 'error', message: string } + +/** + * Encrypt a next group key using the current group key. + */ +export function encryptNextGroupKey( + nextGroupKeyId: string, + nextGroupKeyData: Uint8Array, + currentGroupKeyData: Uint8Array +): { id: string, data: Uint8Array } { + return { + id: nextGroupKeyId, + data: encryptWithAES(nextGroupKeyData, currentGroupKeyData) + } +} + +/** + * Decrypt an encrypted group key using the current group key. + */ +export function decryptNextGroupKey( + encryptedGroupKeyId: string, + encryptedGroupKeyData: Uint8Array, + currentGroupKeyData: Uint8Array +): { id: string, data: Uint8Array } { + return { + id: encryptedGroupKeyId, + data: decryptWithAES(encryptedGroupKeyData, currentGroupKeyData) + } +} + +/** + * Decrypt a stream message content and optionally the new group key. + */ +export function decryptStreamMessageContent( + content: Uint8Array, + groupKeyData: Uint8Array, + newGroupKey?: { id: string, data: Uint8Array } +): { content: Uint8Array, newGroupKey?: { id: string, data: Uint8Array } } { + const decryptedContent = decryptWithAES(content, groupKeyData) + + let decryptedNewGroupKey: { id: string, data: Uint8Array } | undefined + if (newGroupKey) { + decryptedNewGroupKey = decryptNextGroupKey(newGroupKey.id, newGroupKey.data, groupKeyData) + } + + return { + content: decryptedContent, + newGroupKey: decryptedNewGroupKey + } +} diff --git a/packages/sdk/src/publish/MessageFactory.ts b/packages/sdk/src/publish/MessageFactory.ts index d81c83e90c..db6b17a79c 100644 --- a/packages/sdk/src/publish/MessageFactory.ts +++ b/packages/sdk/src/publish/MessageFactory.ts @@ -5,7 +5,7 @@ import { Identity } from '../identity/Identity' import { getPartitionCount } from '../StreamMetadata' import { StreamrClientError } from '../StreamrClientError' import { StreamRegistry } from '../contracts/StreamRegistry' -import { EncryptionUtil } from '../encryption/EncryptionUtil' +import { EncryptionService } from '../encryption/EncryptionService' import { MessageID } from '../protocol/MessageID' import { MessageRef } from '../protocol/MessageRef' @@ -30,6 +30,7 @@ export interface MessageFactoryOptions { groupKeyQueue: GroupKeyQueue signatureValidator: SignatureValidator messageSigner: MessageSigner + encryptionService: EncryptionService config: Pick } @@ -45,6 +46,7 @@ export class MessageFactory { private readonly groupKeyQueue: GroupKeyQueue private readonly signatureValidator: SignatureValidator private readonly messageSigner: MessageSigner + private readonly encryptionService: EncryptionService private readonly config: Pick private firstMessage = true @@ -55,6 +57,7 @@ export class MessageFactory { this.groupKeyQueue = opts.groupKeyQueue this.signatureValidator = opts.signatureValidator this.messageSigner = opts.messageSigner + this.encryptionService = opts.encryptionService this.config = opts.config this.defaultMessageChainIds = createLazyMap({ valueFactory: async () => { @@ -131,10 +134,10 @@ export class MessageFactory { } if (encryptionType === EncryptionType.AES) { const keySequence = await this.groupKeyQueue.useGroupKey() - rawContent = EncryptionUtil.encryptWithAES(rawContent, keySequence.current.data) + rawContent = await this.encryptionService.encryptWithAES(rawContent, keySequence.current.data) groupKeyId = keySequence.current.id if (keySequence.next !== undefined) { - newGroupKey = keySequence.current.encryptNextGroupKey(keySequence.next) + newGroupKey = await this.encryptionService.encryptNextGroupKey(keySequence.current, keySequence.next) } } diff --git a/packages/sdk/src/publish/Publisher.ts b/packages/sdk/src/publish/Publisher.ts index 89b567b620..5d78a748dc 100644 --- a/packages/sdk/src/publish/Publisher.ts +++ b/packages/sdk/src/publish/Publisher.ts @@ -8,6 +8,7 @@ import { StreamIDBuilder } from '../StreamIDBuilder' import { StreamrClientError } from '../StreamrClientError' import { StreamRegistry } from '../contracts/StreamRegistry' import { getExplicitKey, GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage } from '../protocol/StreamMessage' import { MessageSigner } from '../signature/MessageSigner' import { SignatureValidator } from '../signature/SignatureValidator' @@ -54,6 +55,7 @@ export class Publisher { private readonly identity: Identity private readonly signatureValidator: SignatureValidator private readonly messageSigner: MessageSigner + private readonly encryptionService: EncryptionService private readonly config: StrictStreamrClientConfig constructor( @@ -64,6 +66,7 @@ export class Publisher { @inject(IdentityInjectionToken) identity: Identity, signatureValidator: SignatureValidator, messageSigner: MessageSigner, + encryptionService: EncryptionService, @inject(ConfigInjectionToken) config: StrictStreamrClientConfig, ) { this.node = node @@ -72,6 +75,7 @@ export class Publisher { this.identity = identity this.signatureValidator = signatureValidator this.messageSigner = messageSigner + this.encryptionService = encryptionService this.config = config this.messageFactories = createLazyMap({ valueFactory: async (streamId) => { @@ -142,6 +146,7 @@ export class Publisher { groupKeyQueue: await this.groupKeyQueues.get(streamId), signatureValidator: this.signatureValidator, messageSigner: this.messageSigner, + encryptionService: this.encryptionService, config: this.config, }) } diff --git a/packages/sdk/src/subscribe/MessagePipelineFactory.ts b/packages/sdk/src/subscribe/MessagePipelineFactory.ts index c5c1283660..5c8c1c9d49 100644 --- a/packages/sdk/src/subscribe/MessagePipelineFactory.ts +++ b/packages/sdk/src/subscribe/MessagePipelineFactory.ts @@ -6,6 +6,7 @@ import { DestroySignal } from '../DestroySignal' import { StreamRegistry } from '../contracts/StreamRegistry' import { StreamStorageRegistry } from '../contracts/StreamStorageRegistry' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage } from '../protocol/StreamMessage' import { SignatureValidator } from '../signature/SignatureValidator' import { LoggerFactory } from '../utils/LoggerFactory' @@ -17,6 +18,7 @@ import { Tokens } from '../tokens' type MessagePipelineFactoryOptions = MarkOptional StreamRegistry)) streamRegistry: StreamRegistry, signatureValidator: SignatureValidator, @inject(delay(() => GroupKeyManager)) groupKeyManager: GroupKeyManager, + encryptionService: EncryptionService, @inject(ConfigInjectionToken) config: MessagePipelineOptions['config'], destroySignal: DestroySignal, loggerFactory: LoggerFactory @@ -52,6 +56,7 @@ export class MessagePipelineFactory { this.streamRegistry = streamRegistry this.signatureValidator = signatureValidator this.groupKeyManager = groupKeyManager + this.encryptionService = encryptionService this.config = config this.destroySignal = destroySignal this.loggerFactory = loggerFactory @@ -65,6 +70,7 @@ export class MessagePipelineFactory { streamRegistry: this.streamRegistry, signatureValidator: this.signatureValidator, groupKeyManager: this.groupKeyManager, + encryptionService: this.encryptionService, config: opts.config ?? this.config, destroySignal: this.destroySignal, loggerFactory: this.loggerFactory diff --git a/packages/sdk/src/subscribe/messagePipeline.ts b/packages/sdk/src/subscribe/messagePipeline.ts index 34ec073714..a478625944 100644 --- a/packages/sdk/src/subscribe/messagePipeline.ts +++ b/packages/sdk/src/subscribe/messagePipeline.ts @@ -6,6 +6,7 @@ import type { StrictStreamrClientConfig } from '../ConfigTypes' import { DestroySignal } from '../DestroySignal' import { StreamRegistry } from '../contracts/StreamRegistry' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { decrypt } from '../encryption/decrypt' import { StreamMessage } from '../protocol/StreamMessage' @@ -28,6 +29,7 @@ export interface MessagePipelineOptions { streamRegistry: StreamRegistry signatureValidator: SignatureValidator groupKeyManager: GroupKeyManager + encryptionService: EncryptionService // eslint-disable-next-line max-len config: Pick destroySignal: DestroySignal @@ -71,7 +73,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin let decrypted if (StreamMessage.isAESEncrypted(msg)) { try { - decrypted = await decrypt(msg, opts.groupKeyManager, opts.destroySignal) + decrypted = await decrypt(msg, opts.groupKeyManager, opts.encryptionService, opts.destroySignal) } catch (err) { // TODO log this in onError? if we want to log all errors? logger.debug('Failed to decrypt', { messageId: msg.messageId, err }) diff --git a/packages/sdk/test/integration/Resends.test.ts b/packages/sdk/test/integration/Resends.test.ts index 301433a375..98cb477886 100644 --- a/packages/sdk/test/integration/Resends.test.ts +++ b/packages/sdk/test/integration/Resends.test.ts @@ -8,7 +8,7 @@ import { StreamPermission } from '../../src/permission' import { MessageFactory } from '../../src/publish/MessageFactory' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -45,6 +45,7 @@ describe('Resends', () => { groupKeyQueue: await createGroupKeyQueue(identity, groupKey), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) // store the encryption key publisher's local group key store diff --git a/packages/sdk/test/integration/gap-fill.test.ts b/packages/sdk/test/integration/gap-fill.test.ts index d577ce120b..4b6f37cb47 100644 --- a/packages/sdk/test/integration/gap-fill.test.ts +++ b/packages/sdk/test/integration/gap-fill.test.ts @@ -6,7 +6,14 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { StreamMessage } from '../../src/protocol/StreamMessage' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' +import { + createGroupKeyQueue, + createMessageSigner, + createMockEncryptionService, + createStreamRegistry, + createTestStream, + startFailingStorageNode +} from '../test-utils/utils' import { Stream } from './../../src/Stream' import { MessageFactory } from './../../src/publish/MessageFactory' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -45,6 +52,7 @@ describe('gap fill', () => { groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) }) diff --git a/packages/sdk/test/integration/parallel-key-exchange.test.ts b/packages/sdk/test/integration/parallel-key-exchange.test.ts index ff6f872ed7..73ec30e7da 100644 --- a/packages/sdk/test/integration/parallel-key-exchange.test.ts +++ b/packages/sdk/test/integration/parallel-key-exchange.test.ts @@ -10,7 +10,7 @@ import { StreamPermission } from '../../src/permission' import { StreamMessageType } from '../../src/protocol/StreamMessage' import { MessageFactory } from '../../src/publish/MessageFactory' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' import { FakeEnvironment } from './../test-utils/fake/FakeEnvironment' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -73,6 +73,7 @@ describe('parallel key exchange', () => { groupKeyQueue: await createGroupKeyQueue(identity, publisher.groupKey), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) { diff --git a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts index 3d530952c2..5aa794d00e 100644 --- a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts +++ b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts @@ -22,6 +22,7 @@ import { FakeStreamRegistry } from './FakeStreamRegistry' import { FakeStreamStorageRegistry } from './FakeStreamStorageRegistry' import { DestroySignal } from '../../../src/DestroySignal' import { SigningService } from '../../../src/signature/SigningService' +import { EncryptionService } from '../../../src/encryption/EncryptionService' const DEFAULT_CLIENT_OPTIONS: StreamrClientConfig = { encryption: { @@ -37,6 +38,7 @@ export class FakeEnvironment { private dependencyContainer: DependencyContainer private destroySignal: DestroySignal private signingService: SigningService + private encryptionService: EncryptionService constructor() { this.network = new FakeNetwork() @@ -45,6 +47,7 @@ export class FakeEnvironment { this.dependencyContainer = container.createChildContainer() this.destroySignal = new DestroySignal() this.signingService = new SigningService(this.destroySignal) + this.encryptionService = new EncryptionService(this.destroySignal) const loggerFactory = { createLogger: () => this.logger } @@ -58,6 +61,7 @@ export class FakeEnvironment { this.dependencyContainer.register(StorageNodeRegistry, FakeStorageNodeRegistry as any) this.dependencyContainer.register(OperatorRegistry, FakeOperatorRegistry as any) this.dependencyContainer.register(SigningService, { useValue: this.signingService }) + this.dependencyContainer.register(EncryptionService, { useValue: this.encryptionService }) } createClient(opts?: StreamrClientConfig): StreamrClient { diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index 8ea62c0fc2..75225a21ae 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -44,6 +44,9 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { LocalGroupKeyStore } from '../../src/encryption/LocalGroupKeyStore' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' +import { EncryptionService } from '../../src/encryption/EncryptionService' +import { encryptWithAES, decryptWithAES } from '../../src/encryption/aesUtils' +import { encryptNextGroupKey, decryptNextGroupKey, decryptStreamMessageContent } from '../../src/encryption/encryptionUtils' import { StreamrClientEventEmitter } from '../../src/events' import { StreamMessage } from '../../src/protocol/StreamMessage' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' @@ -80,6 +83,45 @@ export function createMessageSigner(identity: Identity): MessageSigner { return new MessageSigner(identity, createMockSigningService()) } +/** + * Creates a mock EncryptionService that performs encryption synchronously on the main thread. + * Use this in tests instead of the real EncryptionService which spawns a worker. + */ +export function createMockEncryptionService(): EncryptionService { + return { + encryptWithAES: async (data: Uint8Array, cipherKey: Uint8Array) => { + return encryptWithAES(data, cipherKey) + }, + decryptWithAES: async (cipher: Uint8Array, cipherKey: Uint8Array) => { + return decryptWithAES(cipher, cipherKey) + }, + encryptNextGroupKey: async (currentKey: GroupKey, nextKey: GroupKey) => { + return encryptNextGroupKey(nextKey.id, nextKey.data, currentKey.data) + }, + decryptNextGroupKey: async (currentKey: GroupKey, encryptedKey: { id: string, data: Uint8Array }) => { + const result = decryptNextGroupKey(encryptedKey.id, encryptedKey.data, currentKey.data) + return new GroupKey(result.id, Buffer.from(result.data)) + }, + decryptStreamMessage: async ( + content: Uint8Array, + groupKey: GroupKey, + encryptedNewGroupKey?: { id: string, data: Uint8Array } + ) => { + const result = decryptStreamMessageContent( + content, + groupKey.data, + encryptedNewGroupKey + ) + let newGroupKey: GroupKey | undefined + if (result.newGroupKey) { + newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + } + return [result.content, newGroupKey] as [Uint8Array, GroupKey?] + }, + destroy: () => {} + } as unknown as EncryptionService +} + export function mockLoggerFactory(clientId?: string): LoggerFactory { return new LoggerFactory({ id: clientId ?? counterId('TestCtx'), @@ -173,7 +215,8 @@ export const createMockMessage = async ( }), groupKeyQueue: await createGroupKeyQueue(identity, opts.encryptionKey, opts.nextEncryptionKey), signatureValidator: mock(), - messageSigner: createMessageSigner(identity) + messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService() }) const DEFAULT_CONTENT = {} const plainContent = opts.content ?? DEFAULT_CONTENT diff --git a/packages/sdk/test/unit/Decrypt.test.ts b/packages/sdk/test/unit/Decrypt.test.ts index 502c68d517..dc6af3c14d 100644 --- a/packages/sdk/test/unit/Decrypt.test.ts +++ b/packages/sdk/test/unit/Decrypt.test.ts @@ -6,7 +6,7 @@ import { StreamrClientError } from '../../src/StreamrClientError' import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { decrypt } from '../../src/encryption/decrypt' -import { createGroupKeyManager, createMockMessage } from '../test-utils/utils' +import { createGroupKeyManager, createMockEncryptionService, createMockMessage } from '../test-utils/utils' import { StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' import { EncryptionType } from '@streamr/trackerless-network' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -26,7 +26,7 @@ describe('Decrypt', () => { encryptionKey: groupKey, content: unencryptedContent }) as StreamMessageAESEncrypted - const decryptedMessage = await decrypt(encryptedMessage, groupKeyManager, destroySignal) + const decryptedMessage = await decrypt(encryptedMessage, groupKeyManager, createMockEncryptionService(), destroySignal) expect(decryptedMessage).toEqual(new StreamMessage({ ...encryptedMessage, encryptionType: EncryptionType.NONE, @@ -53,6 +53,7 @@ describe('Decrypt', () => { return decrypt( msg as StreamMessageAESEncrypted, groupKeyManager, + createMockEncryptionService(), destroySignal) }).rejects.toThrowStreamrClientError( new StreamrClientError(`Could not get encryption key ${groupKey.id}`, 'DECRYPT_ERROR', msg) diff --git a/packages/sdk/test/unit/EncryptionService.test.ts b/packages/sdk/test/unit/EncryptionService.test.ts new file mode 100644 index 0000000000..f16f6b2fb9 --- /dev/null +++ b/packages/sdk/test/unit/EncryptionService.test.ts @@ -0,0 +1,213 @@ +import { utf8ToBinary } from '@streamr/utils' +import { EncryptionService } from '../../src/encryption/EncryptionService' +import { GroupKey } from '../../src/encryption/GroupKey' +import { DestroySignal } from '../../src/DestroySignal' +import { StreamrClientError } from '../../src/StreamrClientError' + +describe('EncryptionService', () => { + + let encryptionService: EncryptionService + let destroySignal: DestroySignal + + beforeEach(() => { + destroySignal = new DestroySignal() + encryptionService = new EncryptionService(destroySignal) + }) + + afterEach(() => { + encryptionService.destroy() + }) + + describe('encryptWithAES / decryptWithAES', () => { + it('encrypts and decrypts data correctly', async () => { + const plaintextOriginal = utf8ToBinary('hello world') + const key = GroupKey.generate() + + // Make a copy since the original will be transferred + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + + expect(ciphertext).not.toStrictEqual(plaintextOriginal) + expect(ciphertext.length).toBeGreaterThan(plaintextOriginal.length) + + const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + + it('produces different ciphertexts for same plaintext (due to random IV)', async () => { + const plaintext = utf8ToBinary('hello world') + const key = GroupKey.generate() + + const cipher1 = await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + const cipher2 = await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + expect(cipher1).not.toStrictEqual(cipher2) + }) + + it('handles empty data', async () => { + const plaintextOriginal = new Uint8Array(0) + const key = GroupKey.generate() + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + + it('handles large data', async () => { + const plaintextOriginal = new Uint8Array(100000).fill(42) + const key = GroupKey.generate() + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + }) + + describe('encryptNextGroupKey / decryptNextGroupKey', () => { + it('encrypts and decrypts group key correctly', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + + const encrypted = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + expect(encrypted.id).toBe(nextKey.id) + expect(encrypted.data).not.toStrictEqual(nextKey.data) + + const decrypted = await encryptionService.decryptNextGroupKey(currentKey, encrypted) + + expect(decrypted.id).toBe(nextKey.id) + expect(decrypted.data).toStrictEqual(nextKey.data) + }) + + it('produces different ciphertexts for same key (due to random IV)', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + + const encrypted1 = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + const encrypted2 = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + expect(encrypted1.data).not.toStrictEqual(encrypted2.data) + }) + }) + + describe('decryptStreamMessage', () => { + it('decrypts content without new group key', async () => { + const groupKey = GroupKey.generate() + const plaintextOriginal = utf8ToBinary('{"message": "hello"}') + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + groupKey.data + ) + + const [decryptedContent, newGroupKey] = await encryptionService.decryptStreamMessage( + ciphertext, + groupKey + ) + + expect(decryptedContent).toStrictEqual(plaintextOriginal) + expect(newGroupKey).toBeUndefined() + }) + + it('decrypts content with new group key', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + const plaintextOriginal = utf8ToBinary('{"message": "hello"}') + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + currentKey.data + ) + const encryptedNextKey = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + const [decryptedContent, decryptedNewGroupKey] = await encryptionService.decryptStreamMessage( + ciphertext, + currentKey, + encryptedNextKey + ) + + expect(decryptedContent).toStrictEqual(plaintextOriginal) + expect(decryptedNewGroupKey).toBeDefined() + expect(decryptedNewGroupKey!.id).toBe(nextKey.id) + expect(decryptedNewGroupKey!.data).toStrictEqual(nextKey.data) + }) + + it('throws StreamrClientError on invalid encrypted content', async () => { + const groupKey = GroupKey.generate() + // Content that's too short to contain valid IV + ciphertext + const invalidContent = new Uint8Array([1, 2, 3]) + + await expect(encryptionService.decryptStreamMessage(invalidContent, groupKey)) + .rejects + .toThrow(StreamrClientError) + }) + }) + + describe('lifecycle', () => { + it('cleans up worker on destroy', async () => { + const plaintext = utf8ToBinary('test') + const key = GroupKey.generate() + + // First encrypt to ensure worker is created + await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + // Destroy should not throw + expect(() => encryptionService.destroy()).not.toThrow() + + // Calling destroy again should be safe (idempotent) + expect(() => encryptionService.destroy()).not.toThrow() + }) + + it('cleans up via DestroySignal', async () => { + const plaintext = utf8ToBinary('test') + const key = GroupKey.generate() + + await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + // Trigger destroy via signal - should not throw + await destroySignal.destroy() + }) + + it('lazily initializes worker on first use', async () => { + // Create a new service but don't use it yet + const signal = new DestroySignal() + const service = new EncryptionService(signal) + + // Destroy without using - should not throw + expect(() => service.destroy()).not.toThrow() + }) + }) + + describe('sequential operations', () => { + it('can perform multiple operations sequentially', async () => { + const key = GroupKey.generate() + const results: Uint8Array[] = [] + + for (let i = 0; i < 5; i++) { + const plaintext = utf8ToBinary(`message ${i}`) + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintext), + key.data + ) + const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + results.push(decrypted) + } + + expect(results).toHaveLength(5) + for (let i = 0; i < 5; i++) { + expect(results[i]).toStrictEqual(utf8ToBinary(`message ${i}`)) + } + }) + }) +}) diff --git a/packages/sdk/test/unit/EncryptionUtil.test.ts b/packages/sdk/test/unit/EncryptionUtil.test.ts index 7cec71e574..d7b8b1deda 100644 --- a/packages/sdk/test/unit/EncryptionUtil.test.ts +++ b/packages/sdk/test/unit/EncryptionUtil.test.ts @@ -1,48 +1,12 @@ -import { createTestWallet } from '@streamr/test-utils' -import { StreamPartIDUtils, hexToBinary, toStreamID, toStreamPartID, utf8ToBinary } from '@streamr/utils' -import { EncryptionUtil, INITIALIZATION_VECTOR_LENGTH } from '../../src/encryption/EncryptionUtil' -import { GroupKey } from '../../src/encryption/GroupKey' -import { StreamrClientError } from '../../src/StreamrClientError' -import { createMockMessage } from '../test-utils/utils' -import { StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' import { AsymmetricEncryptionType } from '@streamr/trackerless-network' -import { RSAKeyPair } from '../../src/encryption/RSAKeyPair' +import { EncryptionUtil } from '../../src/encryption/EncryptionUtil' import { MLKEMKeyPair } from '../../src/encryption/MLKEMKeyPair' - -const STREAM_ID = toStreamID('streamId') +import { RSAKeyPair } from '../../src/encryption/RSAKeyPair' describe('EncryptionUtil', () => { const plaintext = Buffer.from('some random text', 'utf8') - describe('AES', () => { - it('returns a ciphertext which is different from the plaintext', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(ciphertext).not.toStrictEqual(plaintext) - }) - - it('returns the initial plaintext after decrypting the ciphertext', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(EncryptionUtil.decryptWithAES(ciphertext, key.data)).toStrictEqual(plaintext) - }) - - it('preserves size (plaintext + iv)', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(ciphertext.length).toStrictEqual(plaintext.length + INITIALIZATION_VECTOR_LENGTH) - }) - - it('produces different ivs and ciphertexts upon multiple encrypt() calls', () => { - const key = GroupKey.generate() - const cipher1 = EncryptionUtil.encryptWithAES(plaintext, key.data) - const cipher2 = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(cipher1.slice(0, INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(0, INITIALIZATION_VECTOR_LENGTH)) - expect(cipher1.slice(INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(INITIALIZATION_VECTOR_LENGTH)) - }) - }) - describe('RSA', () => { it('returns a ciphertext which is different from the plaintext', async () => { const key = await RSAKeyPair.create(512) @@ -86,39 +50,4 @@ describe('EncryptionUtil', () => { expect(cipher1).not.toStrictEqual(cipher2) }) }) - - describe('StreamMessage decryption', () => { - it('passes the happy path', async () => { - const key = GroupKey.generate() - const nextKey = GroupKey.generate() - const streamMessage = await createMockMessage({ - streamPartId: StreamPartIDUtils.parse('stream#0'), - publisher: await createTestWallet(), - content: { - foo: 'bar' - }, - encryptionKey: key, - nextEncryptionKey: nextKey - }) as StreamMessageAESEncrypted - const [content, newGroupKey] = EncryptionUtil.decryptStreamMessage(streamMessage, key) - expect(content).toEqualBinary(utf8ToBinary('{"foo":"bar"}')) - expect(newGroupKey).toEqual(nextKey) - }) - - it('throws if newGroupKey invalid', async () => { - const key = GroupKey.generate() - const msg = await createMockMessage({ - publisher: await createTestWallet(), - streamPartId: toStreamPartID(STREAM_ID, 0), - encryptionKey: key - }) - const msg2 = new StreamMessage({ - ...msg, - newGroupKey: { id: 'mockId', data: hexToBinary('0x1234') } - }) as StreamMessageAESEncrypted - expect(() => EncryptionUtil.decryptStreamMessage(msg2, key)).toThrowStreamrClientError( - new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', msg2) - ) - }) - }) }) diff --git a/packages/sdk/test/unit/MessageFactory.test.ts b/packages/sdk/test/unit/MessageFactory.test.ts index 76d0edb8bb..752a2138c3 100644 --- a/packages/sdk/test/unit/MessageFactory.test.ts +++ b/packages/sdk/test/unit/MessageFactory.test.ts @@ -10,7 +10,8 @@ import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory, MessageFactoryOptions } from '../../src/publish/MessageFactory' import { PublishMetadata } from '../../src/publish/Publisher' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' +import { decryptNextGroupKey } from '../../src/encryption/encryptionUtils' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { EncryptionType, SignatureType, ContentType } from '@streamr/trackerless-network' @@ -60,6 +61,7 @@ describe('MessageFactory', () => { groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock(), new DestroySignal()), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: { validation: { permissions: true, @@ -195,7 +197,8 @@ describe('MessageFactory', () => { id: nextGroupKey.id, data: expect.any(Uint8Array) }) - expect(GROUP_KEY.decryptNextGroupKey(msg.newGroupKey!)).toEqual(nextGroupKey) + const decrypted = decryptNextGroupKey(msg.newGroupKey!.id, msg.newGroupKey!.data, GROUP_KEY.data) + expect(new GroupKey(decrypted.id, Buffer.from(decrypted.data))).toEqual(nextGroupKey) }) it('not a publisher', async () => { diff --git a/packages/sdk/test/unit/Publisher.test.ts b/packages/sdk/test/unit/Publisher.test.ts index ac1564ec9d..71a7f32b6d 100644 --- a/packages/sdk/test/unit/Publisher.test.ts +++ b/packages/sdk/test/unit/Publisher.test.ts @@ -3,7 +3,7 @@ import { Publisher } from '../../src/publish/Publisher' import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { StreamIDBuilder } from '../../src/StreamIDBuilder' -import { createGroupKeyManager, createRandomIdentity } from '../test-utils/utils' +import { createGroupKeyManager, createMockEncryptionService, createRandomIdentity } from '../test-utils/utils' import type { StrictStreamrClientConfig } from '../../src/ConfigTypes' describe('Publisher', () => { @@ -22,6 +22,7 @@ describe('Publisher', () => { identity, mock(), mock(), + createMockEncryptionService(), { encryption: {}, validation: { diff --git a/packages/sdk/test/unit/aesUtils.test.ts b/packages/sdk/test/unit/aesUtils.test.ts new file mode 100644 index 0000000000..69ebd377a5 --- /dev/null +++ b/packages/sdk/test/unit/aesUtils.test.ts @@ -0,0 +1,80 @@ +import { createTestWallet } from '@streamr/test-utils' +import { StreamPartIDUtils, utf8ToBinary } from '@streamr/utils' +import { decryptWithAES, encryptWithAES, INITIALIZATION_VECTOR_LENGTH } from '../../src/encryption/aesUtils' +import { decryptStreamMessageContent } from '../../src/encryption/encryptionUtils' +import { GroupKey } from '../../src/encryption/GroupKey' +import { createMockMessage } from '../test-utils/utils' +import { StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' + +describe('aesUtils', () => { + + const plaintext = Buffer.from('some random text', 'utf8') + + describe('encryptWithAES / decryptWithAES', () => { + it('returns a ciphertext which is different from the plaintext', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(ciphertext).not.toStrictEqual(plaintext) + }) + + it('returns the initial plaintext after decrypting the ciphertext', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(decryptWithAES(ciphertext, key.data)).toStrictEqual(plaintext) + }) + + it('preserves size (plaintext + iv)', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(ciphertext.length).toStrictEqual(plaintext.length + INITIALIZATION_VECTOR_LENGTH) + }) + + it('produces different ivs and ciphertexts upon multiple encrypt() calls', () => { + const key = GroupKey.generate() + const cipher1 = encryptWithAES(plaintext, key.data) + const cipher2 = encryptWithAES(plaintext, key.data) + expect(cipher1.slice(0, INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(0, INITIALIZATION_VECTOR_LENGTH)) + expect(cipher1.slice(INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(INITIALIZATION_VECTOR_LENGTH)) + }) + }) + + describe('decryptStreamMessageContent', () => { + it('decrypts content and new group key', async () => { + const key = GroupKey.generate() + const nextKey = GroupKey.generate() + const streamMessage = await createMockMessage({ + streamPartId: StreamPartIDUtils.parse('stream#0'), + publisher: await createTestWallet(), + content: { + foo: 'bar' + }, + encryptionKey: key, + nextEncryptionKey: nextKey + }) as StreamMessageAESEncrypted + const result = decryptStreamMessageContent(streamMessage.content, key.data, streamMessage.newGroupKey) + expect(result.content).toEqualBinary(utf8ToBinary('{"foo":"bar"}')) + const newGroupKey = result.newGroupKey + ? new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + : undefined + expect(newGroupKey).toEqual(nextKey) + }) + + it('throws if newGroupKey data is invalid', async () => { + const key = GroupKey.generate() + const streamMessage = await createMockMessage({ + streamPartId: StreamPartIDUtils.parse('stream#0'), + publisher: await createTestWallet(), + content: { foo: 'bar' }, + encryptionKey: key + }) as StreamMessageAESEncrypted + // Provide an invalid encrypted group key (too short to contain valid AES data) + const invalidNewGroupKey = { id: 'mockId', data: new Uint8Array([1, 2, 3, 4]) } + // decryptStreamMessageContent uses Node's crypto which throws on invalid cipher data + expect(() => decryptStreamMessageContent( + streamMessage.content, + key.data, + invalidNewGroupKey + )).toThrow() + }) + }) +}) diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index 4f9d00f773..a566004a99 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -6,15 +6,15 @@ import type { StrictStreamrClientConfig } from '../../src/ConfigTypes' import { DestroySignal } from '../../src/DestroySignal' import { ERC1271ContractFacade } from '../../src/contracts/ERC1271ContractFacade' import { StreamRegistry } from '../../src/contracts/StreamRegistry' -import { EncryptionUtil } from '../../src/encryption/EncryptionUtil' import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' +import { encryptWithAES } from '../../src/encryption/aesUtils' import { StreamrClientEventEmitter } from '../../src/events' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { createMessagePipeline } from '../../src/subscribe/messagePipeline' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createMessageSigner, mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, createMockEncryptionService, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EncryptionType, ContentType, SignatureType } from '@streamr/trackerless-network' @@ -95,6 +95,7 @@ describe('messagePipeline', () => { new StreamrClientEventEmitter(), destroySignal ), + encryptionService: createMockEncryptionService(), config, destroySignal, loggerFactory: mockLoggerFactory(), @@ -163,7 +164,7 @@ describe('messagePipeline', () => { it('error: no encryption key available', async () => { const encryptionKey = GroupKey.generate() - const content = EncryptionUtil.encryptWithAES(Buffer.from(JSON.stringify(CONTENT), 'utf8'), encryptionKey.data) + const content = encryptWithAES(Buffer.from(JSON.stringify(CONTENT), 'utf8'), encryptionKey.data) await pipeline.push(await createMessage({ content, encryptionType: EncryptionType.AES, diff --git a/packages/sdk/test/unit/resendSubscription.test.ts b/packages/sdk/test/unit/resendSubscription.test.ts index c716c48a96..ea22aad630 100644 --- a/packages/sdk/test/unit/resendSubscription.test.ts +++ b/packages/sdk/test/unit/resendSubscription.test.ts @@ -11,7 +11,14 @@ import { ResendRangeOptions } from '../../src/subscribe/Resends' import { Subscription, SubscriptionEvents } from '../../src/subscribe/Subscription' import { initResendSubscription } from '../../src/subscribe/resendSubscription' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createGroupKeyQueue, createMessageSigner, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' +import { + createGroupKeyQueue, + createMessageSigner, + createMockEncryptionService, + createRandomIdentity, + createStreamRegistry, + mockLoggerFactory +} from '../test-utils/utils' import { StreamMessage } from './../../src/protocol/StreamMessage' import { createStrictConfig } from '../../src/Config' @@ -61,6 +68,7 @@ describe('resend subscription', () => { groupKeyQueue: await createGroupKeyQueue(identity), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig(), }) }) From f80db2435925c49f5637a0531c0de72106ef76e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Fri, 30 Jan 2026 13:12:23 +0100 Subject: [PATCH 2/9] Remove unused EncryptionService.decryptWithAES method --- packages/sdk/src/encryption/EncryptionService.ts | 14 -------------- packages/sdk/src/encryption/EncryptionWorker.ts | 13 +------------ packages/sdk/src/encryption/encryptionUtils.ts | 9 --------- packages/sdk/test/test-utils/utils.ts | 5 +---- packages/sdk/test/unit/EncryptionService.test.ts | 11 ++++++----- 5 files changed, 8 insertions(+), 44 deletions(-) diff --git a/packages/sdk/src/encryption/EncryptionService.ts b/packages/sdk/src/encryption/EncryptionService.ts index ba49a40489..3203904df8 100644 --- a/packages/sdk/src/encryption/EncryptionService.ts +++ b/packages/sdk/src/encryption/EncryptionService.ts @@ -45,20 +45,6 @@ export class EncryptionService { return result.data } - /** - * Decrypt AES-256-CTR encrypted data. - * Note: The input cipher buffer is transferred to the worker and becomes unusable after this call. - */ - async decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Promise { - const result = await this.getWorkerApi().decrypt( - transfer({ cipher, cipherKey }, [cipher.buffer]) - ) - if (result.type === 'error') { - throw new Error(`AES decryption failed: ${result.message}`) - } - return result.data - } - /** * Encrypt the next group key using the current group key. */ diff --git a/packages/sdk/src/encryption/EncryptionWorker.ts b/packages/sdk/src/encryption/EncryptionWorker.ts index d70fbcac8f..da5f1dc055 100644 --- a/packages/sdk/src/encryption/EncryptionWorker.ts +++ b/packages/sdk/src/encryption/EncryptionWorker.ts @@ -3,18 +3,16 @@ * Offloads CPU-intensive cryptographic operations to a separate thread. */ import { expose, transfer } from 'comlink' -import { decryptWithAES, encryptWithAES } from './aesUtils' +import { encryptWithAES } from './aesUtils' import { encryptNextGroupKey, decryptNextGroupKey, decryptStreamMessageContent, AESEncryptRequest, - AESDecryptRequest, EncryptGroupKeyRequest, DecryptGroupKeyRequest, DecryptStreamMessageRequest, AESEncryptResult, - AESDecryptResult, EncryptGroupKeyResult, DecryptGroupKeyResult, DecryptStreamMessageResult @@ -30,15 +28,6 @@ const workerApi = { } }, - decrypt: async (request: AESDecryptRequest): Promise => { - try { - const result = decryptWithAES(request.cipher, request.cipherKey) - return transfer({ type: 'success', data: result }, [result.buffer]) - } catch (err) { - return { type: 'error', message: String(err) } - } - }, - encryptGroupKey: async (request: EncryptGroupKeyRequest): Promise => { try { const result = encryptNextGroupKey( diff --git a/packages/sdk/src/encryption/encryptionUtils.ts b/packages/sdk/src/encryption/encryptionUtils.ts index c8c5ab4ec5..5b8164a359 100644 --- a/packages/sdk/src/encryption/encryptionUtils.ts +++ b/packages/sdk/src/encryption/encryptionUtils.ts @@ -14,11 +14,6 @@ export interface AESEncryptRequest { cipherKey: Uint8Array } -export interface AESDecryptRequest { - cipher: Uint8Array - cipherKey: Uint8Array -} - export interface EncryptGroupKeyRequest { nextGroupKeyId: string nextGroupKeyData: Uint8Array @@ -47,10 +42,6 @@ export type AESEncryptResult = | { type: 'success', data: Uint8Array } | { type: 'error', message: string } -export type AESDecryptResult = - | { type: 'success', data: Uint8Array } - | { type: 'error', message: string } - export type EncryptGroupKeyResult = | { type: 'success', id: string, data: Uint8Array } | { type: 'error', message: string } diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index 75225a21ae..7cd0f840ca 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -45,7 +45,7 @@ import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { LocalGroupKeyStore } from '../../src/encryption/LocalGroupKeyStore' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' import { EncryptionService } from '../../src/encryption/EncryptionService' -import { encryptWithAES, decryptWithAES } from '../../src/encryption/aesUtils' +import { encryptWithAES } from '../../src/encryption/aesUtils' import { encryptNextGroupKey, decryptNextGroupKey, decryptStreamMessageContent } from '../../src/encryption/encryptionUtils' import { StreamrClientEventEmitter } from '../../src/events' import { StreamMessage } from '../../src/protocol/StreamMessage' @@ -92,9 +92,6 @@ export function createMockEncryptionService(): EncryptionService { encryptWithAES: async (data: Uint8Array, cipherKey: Uint8Array) => { return encryptWithAES(data, cipherKey) }, - decryptWithAES: async (cipher: Uint8Array, cipherKey: Uint8Array) => { - return decryptWithAES(cipher, cipherKey) - }, encryptNextGroupKey: async (currentKey: GroupKey, nextKey: GroupKey) => { return encryptNextGroupKey(nextKey.id, nextKey.data, currentKey.data) }, diff --git a/packages/sdk/test/unit/EncryptionService.test.ts b/packages/sdk/test/unit/EncryptionService.test.ts index f16f6b2fb9..ba7d99dd71 100644 --- a/packages/sdk/test/unit/EncryptionService.test.ts +++ b/packages/sdk/test/unit/EncryptionService.test.ts @@ -18,7 +18,7 @@ describe('EncryptionService', () => { encryptionService.destroy() }) - describe('encryptWithAES / decryptWithAES', () => { + describe('encryptWithAES', () => { it('encrypts and decrypts data correctly', async () => { const plaintextOriginal = utf8ToBinary('hello world') const key = GroupKey.generate() @@ -32,7 +32,8 @@ describe('EncryptionService', () => { expect(ciphertext).not.toStrictEqual(plaintextOriginal) expect(ciphertext.length).toBeGreaterThan(plaintextOriginal.length) - const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + // Use decryptStreamMessage to verify encryption worked correctly + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) expect(decrypted).toStrictEqual(plaintextOriginal) }) @@ -55,7 +56,7 @@ describe('EncryptionService', () => { Uint8Array.from(plaintextOriginal), key.data ) - const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) expect(decrypted).toStrictEqual(plaintextOriginal) }) @@ -68,7 +69,7 @@ describe('EncryptionService', () => { Uint8Array.from(plaintextOriginal), key.data ) - const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) expect(decrypted).toStrictEqual(plaintextOriginal) }) @@ -200,7 +201,7 @@ describe('EncryptionService', () => { Uint8Array.from(plaintext), key.data ) - const decrypted = await encryptionService.decryptWithAES(ciphertext, key.data) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) results.push(decrypted) } From cf7ac0c42a52c5f3a7f58a5f2927930c109f95ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 12:47:41 +0100 Subject: [PATCH 3/9] Fix comlink serialization by converting Buffer to Uint8Array at worker boundary Fix comlink serialization by returning Uint8Array from aesUtils aesUtils now returns plain Uint8Array instead of Buffer, fixing 'Unserializable return value' errors on Linux CI where Buffer objects don't serialize correctly through comlink's structured clone. --- .../sdk/src/encryption/EncryptionService.ts | 20 ++++++++++++------- packages/sdk/src/encryption/EncryptionUtil.ts | 4 ++-- .../sdk/src/encryption/EncryptionWorker.ts | 4 ++-- packages/sdk/src/encryption/aesUtils.ts | 20 ++++++++++++++++--- packages/sdk/test/unit/EncryptionUtil.test.ts | 4 ++-- packages/sdk/test/unit/aesUtils.test.ts | 2 +- 6 files changed, 37 insertions(+), 17 deletions(-) diff --git a/packages/sdk/src/encryption/EncryptionService.ts b/packages/sdk/src/encryption/EncryptionService.ts index 3203904df8..cea16da6f6 100644 --- a/packages/sdk/src/encryption/EncryptionService.ts +++ b/packages/sdk/src/encryption/EncryptionService.ts @@ -36,8 +36,11 @@ export class EncryptionService { * Note: The input data buffer is transferred to the worker and becomes unusable after this call. */ async encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Promise { + // Ensure we have plain Uint8Array instances for worker communication (not Buffer subclass) + const dataArray = new Uint8Array(data) + const keyArray = new Uint8Array(cipherKey) const result = await this.getWorkerApi().encrypt( - transfer({ data, cipherKey }, [data.buffer]) + transfer({ data: dataArray, cipherKey: keyArray }, [dataArray.buffer]) ) if (result.type === 'error') { throw new Error(`AES encryption failed: ${result.message}`) @@ -49,10 +52,11 @@ export class EncryptionService { * Encrypt the next group key using the current group key. */ async encryptNextGroupKey(currentKey: GroupKey, nextKey: GroupKey): Promise { + // Convert Buffer to Uint8Array for worker communication const result = await this.getWorkerApi().encryptGroupKey({ nextGroupKeyId: nextKey.id, - nextGroupKeyData: nextKey.data, - currentGroupKeyData: currentKey.data + nextGroupKeyData: new Uint8Array(nextKey.data), + currentGroupKeyData: new Uint8Array(currentKey.data) }) if (result.type === 'error') { throw new Error(`Group key encryption failed: ${result.message}`) @@ -67,10 +71,11 @@ export class EncryptionService { * Decrypt an encrypted group key using the current group key. */ async decryptNextGroupKey(currentKey: GroupKey, encryptedKey: EncryptedGroupKey): Promise { + // Convert Buffer to Uint8Array for worker communication const result = await this.getWorkerApi().decryptGroupKey({ encryptedGroupKeyId: encryptedKey.id, - encryptedGroupKeyData: encryptedKey.data, - currentGroupKeyData: currentKey.data + encryptedGroupKeyData: new Uint8Array(encryptedKey.data), + currentGroupKeyData: new Uint8Array(currentKey.data) }) if (result.type === 'error') { throw new Error(`Group key decryption failed: ${result.message}`) @@ -88,12 +93,13 @@ export class EncryptionService { groupKey: GroupKey, encryptedNewGroupKey?: EncryptedGroupKey ): Promise<[Uint8Array, GroupKey?]> { + // Convert Buffer to Uint8Array for worker communication const request = { content, - groupKeyData: groupKey.data, + groupKeyData: new Uint8Array(groupKey.data), newGroupKey: encryptedNewGroupKey ? { id: encryptedNewGroupKey.id, - data: encryptedNewGroupKey.data + data: new Uint8Array(encryptedNewGroupKey.data) } : undefined } const result = await this.getWorkerApi().decryptStreamMessage( diff --git a/packages/sdk/src/encryption/EncryptionUtil.ts b/packages/sdk/src/encryption/EncryptionUtil.ts index b7bc0d893f..47d983a51d 100644 --- a/packages/sdk/src/encryption/EncryptionUtil.ts +++ b/packages/sdk/src/encryption/EncryptionUtil.ts @@ -118,7 +118,7 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Encrypt plaintext with the AES wrapping key - const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey)) + const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, wrappingAESKey) // Concatenate the deliverables into a binary package return Buffer.concat([kemCipher, kdfSalt, aesEncryptedPlaintext]) @@ -140,6 +140,6 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Decrypt the aesEncryptedPlaintext - return decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey)) + return Buffer.from(decryptWithAES(aesEncryptedPlaintext, wrappingAESKey)) } } diff --git a/packages/sdk/src/encryption/EncryptionWorker.ts b/packages/sdk/src/encryption/EncryptionWorker.ts index da5f1dc055..23a0fb445d 100644 --- a/packages/sdk/src/encryption/EncryptionWorker.ts +++ b/packages/sdk/src/encryption/EncryptionWorker.ts @@ -21,8 +21,8 @@ import { const workerApi = { encrypt: async (request: AESEncryptRequest): Promise => { try { - const result = encryptWithAES(request.data, request.cipherKey) - return transfer({ type: 'success', data: result }, [result.buffer]) + const data = encryptWithAES(request.data, request.cipherKey) + return transfer({ type: 'success', data }, [data.buffer]) } catch (err) { return { type: 'error', message: String(err) } } diff --git a/packages/sdk/src/encryption/aesUtils.ts b/packages/sdk/src/encryption/aesUtils.ts index db4786b9a9..14744019ed 100644 --- a/packages/sdk/src/encryption/aesUtils.ts +++ b/packages/sdk/src/encryption/aesUtils.ts @@ -7,6 +7,20 @@ import { createCipheriv, createDecipheriv } from '@streamr/utils' export const INITIALIZATION_VECTOR_LENGTH = 16 +/** + * Concatenate multiple Uint8Arrays into a single Uint8Array. + */ +function concatBytes(...arrays: Uint8Array[]): Uint8Array { + const totalLength = arrays.reduce((sum, arr) => sum + arr.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const arr of arrays) { + result.set(arr, offset) + offset += arr.length + } + return result +} + /** * Encrypt data using AES-256-CTR. * Returns IV prepended to ciphertext. @@ -14,15 +28,15 @@ export const INITIALIZATION_VECTOR_LENGTH = 16 export function encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array { const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode const cipher = createCipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([iv, cipher.update(data), cipher.final()]) + return concatBytes(iv, cipher.update(data), cipher.final()) } /** * Decrypt AES-256-CTR encrypted data. * Expects IV prepended to ciphertext. */ -export function decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer { +export function decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Uint8Array { const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH) const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()]) + return concatBytes(decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()) } diff --git a/packages/sdk/test/unit/EncryptionUtil.test.ts b/packages/sdk/test/unit/EncryptionUtil.test.ts index d7b8b1deda..8c094c9e73 100644 --- a/packages/sdk/test/unit/EncryptionUtil.test.ts +++ b/packages/sdk/test/unit/EncryptionUtil.test.ts @@ -17,7 +17,7 @@ describe('EncryptionUtil', () => { it('returns the initial plaintext after decrypting the ciphertext', async () => { const key = await RSAKeyPair.create(512) const ciphertext = await EncryptionUtil.encryptForPublicKey(plaintext, key.getPublicKey(), AsymmetricEncryptionType.RSA) - expect(await EncryptionUtil.decryptWithPrivateKey(ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.RSA)).toStrictEqual(plaintext) + expect(await EncryptionUtil.decryptWithPrivateKey(ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.RSA)).toEqualBinary(plaintext) }) it('produces different ciphertexts upon multiple encrypt() calls', async () => { @@ -40,7 +40,7 @@ describe('EncryptionUtil', () => { const ciphertext = await EncryptionUtil.encryptForPublicKey(plaintext, key.getPublicKey(), AsymmetricEncryptionType.ML_KEM) expect(await EncryptionUtil.decryptWithPrivateKey( ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.ML_KEM - )).toStrictEqual(plaintext) + )).toEqualBinary(plaintext) }) it('produces different ciphertexts upon multiple encrypt() calls', async () => { diff --git a/packages/sdk/test/unit/aesUtils.test.ts b/packages/sdk/test/unit/aesUtils.test.ts index 69ebd377a5..d09bd9e454 100644 --- a/packages/sdk/test/unit/aesUtils.test.ts +++ b/packages/sdk/test/unit/aesUtils.test.ts @@ -20,7 +20,7 @@ describe('aesUtils', () => { it('returns the initial plaintext after decrypting the ciphertext', () => { const key = GroupKey.generate() const ciphertext = encryptWithAES(plaintext, key.data) - expect(decryptWithAES(ciphertext, key.data)).toStrictEqual(plaintext) + expect(decryptWithAES(ciphertext, key.data)).toEqualBinary(plaintext) }) it('preserves size (plaintext + iv)', () => { From 8d9257cedcdcfed6cc75fcf6b1c8b72d94dcfb1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 13:56:43 +0100 Subject: [PATCH 4/9] Make karma to the actual Karma-specific `createEncryptionWorker` factory --- packages/sdk/createKarmaConfig.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sdk/createKarmaConfig.ts b/packages/sdk/createKarmaConfig.ts index e455504467..bf322dbbfd 100644 --- a/packages/sdk/createKarmaConfig.ts +++ b/packages/sdk/createKarmaConfig.ts @@ -17,6 +17,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType Date: Mon, 2 Feb 2026 14:25:11 +0100 Subject: [PATCH 5/9] Return plain Uint8Array from EcdsaSecp256k1Evm.createSignature Matches the declared return type and ensures proper serialization across environments (e.g., through comlink workers). --- packages/utils/src/SigningUtil.ts | 6 ++++-- packages/utils/test/SigningUtil.test.ts | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/utils/src/SigningUtil.ts b/packages/utils/src/SigningUtil.ts index 2a664f2a8f..968f0f9a15 100644 --- a/packages/utils/src/SigningUtil.ts +++ b/packages/utils/src/SigningUtil.ts @@ -82,8 +82,10 @@ export class EcdsaSecp256k1Evm extends SigningUtil { async createSignature(payload: Uint8Array, privateKey: Uint8Array): Promise { const msgHash = this.keccakHash(payload) const sigObj = secp256k1.ecdsaSign(msgHash, privateKey) - const result = Buffer.alloc(sigObj.signature.length + 1, Buffer.from(sigObj.signature)) - result.writeInt8(27 + sigObj.recid, result.length - 1) + // Return plain Uint8Array (not Buffer) for proper serialization across environments + const result = new Uint8Array(sigObj.signature.length + 1) + result.set(sigObj.signature) + result[result.length - 1] = 27 + sigObj.recid return result } diff --git a/packages/utils/test/SigningUtil.test.ts b/packages/utils/test/SigningUtil.test.ts index 9baff6a869..e7d656bc63 100644 --- a/packages/utils/test/SigningUtil.test.ts +++ b/packages/utils/test/SigningUtil.test.ts @@ -1,5 +1,5 @@ /* eslint-disable max-len */ -import { hexToBinary } from '../src/binaryUtils' +import { areEqualBinaries, hexToBinary } from '../src/binaryUtils' import { EcdsaSecp256k1Evm, EcdsaSecp256r1, MlDsa87, SigningUtil } from '../src/SigningUtil' import { toUserId, toUserIdRaw } from '../src/UserID' @@ -46,7 +46,9 @@ describe('EcdsaSecp256k1Evm', () => { it('produces correct signature', async () => { const payload = Buffer.from('data-to-sign') const signature = await util.createSignature(payload, privateKey) - expect(signature).toStrictEqual(hexToBinary('787cd72924153c88350e808de68b68c88030cbc34d053a5c696a5893d5e6fec1687c1b6205ec99aeb3375a81bf5cb8857ae39c1b55a41b32ed6399ae8da456a61b')) + + const expectedSignature = hexToBinary('787cd72924153c88350e808de68b68c88030cbc34d053a5c696a5893d5e6fec1687c1b6205ec99aeb3375a81bf5cb8857ae39c1b55a41b32ed6399ae8da456a61b') + expect(areEqualBinaries(signature, expectedSignature)).toBeTrue() }) }) From 9daf6973331dd9c19649683af2a5f08b511e474d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 14:25:26 +0100 Subject: [PATCH 6/9] Convert protobuf binary fields to plain Uint8Array on deserialization Ensures signature, content, and newGroupKey.data are plain Uint8Array after fromBinary(), fixing Buffer serialization issues in tests. --- packages/sdk/src/protocol/StreamMessageTranslator.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/protocol/StreamMessageTranslator.ts b/packages/sdk/src/protocol/StreamMessageTranslator.ts index 1b3350fab2..12e72800dd 100644 --- a/packages/sdk/src/protocol/StreamMessageTranslator.ts +++ b/packages/sdk/src/protocol/StreamMessageTranslator.ts @@ -86,10 +86,15 @@ export class StreamMessageTranslator { let groupKeyId: string | undefined = undefined if (msg.body.oneofKind === 'contentMessage') { messageType = StreamMessageType.MESSAGE - content = msg.body.contentMessage.content + content = new Uint8Array(msg.body.contentMessage.content) contentType = msg.body.contentMessage.contentType encryptionType = msg.body.contentMessage.encryptionType - newGroupKey = msg.body.contentMessage.newGroupKey + if (msg.body.contentMessage.newGroupKey) { + newGroupKey = { + id: msg.body.contentMessage.newGroupKey.id, + data: new Uint8Array(msg.body.contentMessage.newGroupKey.data) + } + } groupKeyId = msg.body.contentMessage.groupKeyId } else if (msg.body.oneofKind === 'groupKeyRequest') { messageType = StreamMessageType.GROUP_KEY_REQUEST @@ -128,7 +133,7 @@ export class StreamMessageTranslator { messageType, content, contentType, - signature: msg.signature, + signature: new Uint8Array(msg.signature), signatureType: msg.signatureType, encryptionType, groupKeyId, From 69448614c3a78e56af959fbe563699755a05f063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 14:41:43 +0100 Subject: [PATCH 7/9] Return plain Uint8Array from hexToBinary as declared Validates hex characters upfront with regex before parsing. --- packages/utils/src/binaryUtils.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/binaryUtils.ts b/packages/utils/src/binaryUtils.ts index e9d3bc05ae..0d514ad1fd 100644 --- a/packages/utils/src/binaryUtils.ts +++ b/packages/utils/src/binaryUtils.ts @@ -20,10 +20,13 @@ export const hexToBinary = (hex: string): Uint8Array => { if (hex.length % 2 !== 0) { throw new Error(`Hex string length must be even, received: 0x${hex}`) } - const result = Buffer.from(hex, 'hex') - if (hex.length !== result.length * 2) { + if (!/^[0-9a-fA-F]*$/.test(hex)) { throw new Error(`Hex string input is likely malformed, received: 0x${hex}`) } + const result = new Uint8Array(hex.length / 2) + for (let i = 0; i < result.length; i++) { + result[i] = parseInt(hex.slice(i * 2, i * 2 + 2), 16) + } return result } From e76af90f12ed7b65e9700788d3ade535cfd3ecc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 14:47:55 +0100 Subject: [PATCH 8/9] Revert `SigningUtil` test to use `toStrictEqual` Now that both `createSignature` and `hexToBinary` return plain `Uint8Array`, we can use strict equality comparison again. --- packages/utils/test/SigningUtil.test.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/utils/test/SigningUtil.test.ts b/packages/utils/test/SigningUtil.test.ts index e7d656bc63..9baff6a869 100644 --- a/packages/utils/test/SigningUtil.test.ts +++ b/packages/utils/test/SigningUtil.test.ts @@ -1,5 +1,5 @@ /* eslint-disable max-len */ -import { areEqualBinaries, hexToBinary } from '../src/binaryUtils' +import { hexToBinary } from '../src/binaryUtils' import { EcdsaSecp256k1Evm, EcdsaSecp256r1, MlDsa87, SigningUtil } from '../src/SigningUtil' import { toUserId, toUserIdRaw } from '../src/UserID' @@ -46,9 +46,7 @@ describe('EcdsaSecp256k1Evm', () => { it('produces correct signature', async () => { const payload = Buffer.from('data-to-sign') const signature = await util.createSignature(payload, privateKey) - - const expectedSignature = hexToBinary('787cd72924153c88350e808de68b68c88030cbc34d053a5c696a5893d5e6fec1687c1b6205ec99aeb3375a81bf5cb8857ae39c1b55a41b32ed6399ae8da456a61b') - expect(areEqualBinaries(signature, expectedSignature)).toBeTrue() + expect(signature).toStrictEqual(hexToBinary('787cd72924153c88350e808de68b68c88030cbc34d053a5c696a5893d5e6fec1687c1b6205ec99aeb3375a81bf5cb8857ae39c1b55a41b32ed6399ae8da456a61b')) }) }) From 3208dcaca31efb34c87cc7f245a19069e94506c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Mon, 2 Feb 2026 15:02:27 +0100 Subject: [PATCH 9/9] Bump key request timeout --- packages/sdk/test/integration/update-encryption-key.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/test/integration/update-encryption-key.test.ts b/packages/sdk/test/integration/update-encryption-key.test.ts index 16554a6ea3..3ceac23ccf 100644 --- a/packages/sdk/test/integration/update-encryption-key.test.ts +++ b/packages/sdk/test/integration/update-encryption-key.test.ts @@ -24,7 +24,7 @@ describe('update encryption key', () => { publisher = environment.createClient() subscriber = environment.createClient({ encryption: { - keyRequestTimeout: 1000 + keyRequestTimeout: 5000 } }) const stream = await publisher.createStream('/path')