Skip to content
1 change: 1 addition & 0 deletions packages/sdk/createKarmaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType<typeof create
'@streamr/dht': resolve(__dirname, '../dht/dist/exports-browser.cjs'),
"@/createSignatureValidationWorker": resolve(__dirname, 'src/_karma/createSignatureValidationWorker.ts'),
"@/createSigningWorker": resolve(__dirname, 'src/_karma/createSigningWorker.ts'),
"@/createEncryptionWorker": resolve(__dirname, 'src/_karma/createEncryptionWorker.ts'),
'@': resolve(__dirname, 'src/_browser'),
},
fallback: {
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const config: Config.InitialOptions = {
moduleNameMapper: {
"^@/createSignatureValidationWorker$": "<rootDir>/src/_jest/createSignatureValidationWorker.ts",
"^@/createSigningWorker$": "<rootDir>/src/_jest/createSigningWorker.ts",
"^@/createEncryptionWorker$": "<rootDir>/src/_jest/createEncryptionWorker.ts",
"^@/(.*)$": "<rootDir>/src/_nodejs/$1",
},
transform: {
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/rollup.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const browserAliases: Alias[] = [
const WORKERS: Record<string, string> = {
'SignatureValidationWorker': 'signature/SignatureValidationWorker',
'SigningWorker': 'signature/SigningWorker',
'EncryptionWorker': 'encryption/EncryptionWorker',
}

export default defineConfig([
Expand Down
11 changes: 11 additions & 0 deletions packages/sdk/src/_browser/createEncryptionWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Browser-specific encryption worker factory.
*/
import Worker from 'web-worker'

export function createEncryptionWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('./workers/EncryptionWorker.browser.mjs', import.meta.url),
{ type: 'module' }
)
}
12 changes: 12 additions & 0 deletions packages/sdk/src/_jest/createEncryptionWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Jest-specific encryption worker factory.
* Points to the built worker in dist/ for testing.
*/
import Worker from 'web-worker'

export function createEncryptionWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('../../dist/workers/EncryptionWorker.node.mjs', import.meta.url),
{ type: 'module' }
)
}
12 changes: 12 additions & 0 deletions packages/sdk/src/_karma/createEncryptionWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Karma-specific encryption worker factory.
* Points to the built worker in dist/ for browser testing.
*/
import Worker from 'web-worker'

export function createEncryptionWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('../../dist/workers/EncryptionWorker.browser.mjs', import.meta.url),
{ type: 'module' }
)
}
11 changes: 11 additions & 0 deletions packages/sdk/src/_nodejs/createEncryptionWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Node.js-specific encryption worker factory.
*/
import Worker from 'web-worker'

export function createEncryptionWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('./workers/EncryptionWorker.node.mjs', import.meta.url),
{ type: 'module' }
)
}
130 changes: 130 additions & 0 deletions packages/sdk/src/encryption/EncryptionService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Singleton encryption service using Web Worker.
* This offloads CPU-intensive AES encryption operations to a separate thread.
* Works in both browser and Node.js environments via platform-specific config.
*
* The worker is lazily initialized on first use and shared across all consumers.
*/
import { wrap, releaseProxy, transfer, type Remote } from 'comlink'
import { Lifecycle, scoped } from 'tsyringe'
import { EncryptedGroupKey } from '@streamr/trackerless-network'
import { createEncryptionWorker } from '@/createEncryptionWorker'
import type { EncryptionWorkerApi } from './EncryptionWorker'
import { DestroySignal } from '../DestroySignal'
import { StreamrClientError } from '../StreamrClientError'
import { GroupKey } from './GroupKey'

@scoped(Lifecycle.ContainerScoped)
export class EncryptionService {
private worker: ReturnType<typeof createEncryptionWorker> | undefined
private workerApi: Remote<EncryptionWorkerApi> | undefined

constructor(destroySignal: DestroySignal) {
destroySignal.onDestroy.listen(() => this.destroy())
}

private getWorkerApi(): Remote<EncryptionWorkerApi> {
if (this.workerApi === undefined) {
this.worker = createEncryptionWorker()
this.workerApi = wrap<EncryptionWorkerApi>(this.worker)
}
return this.workerApi
}

/**
* Encrypt data using AES-256-CTR.
* Note: The input data buffer is transferred to the worker and becomes unusable after this call.
*/
async encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Promise<Uint8Array> {
// Ensure we have plain Uint8Array instances for worker communication (not Buffer subclass)
const dataArray = new Uint8Array(data)
const keyArray = new Uint8Array(cipherKey)
const result = await this.getWorkerApi().encrypt(
transfer({ data: dataArray, cipherKey: keyArray }, [dataArray.buffer])
)
if (result.type === 'error') {
throw new Error(`AES encryption failed: ${result.message}`)
}
return result.data
}

/**
* Encrypt the next group key using the current group key.
*/
async encryptNextGroupKey(currentKey: GroupKey, nextKey: GroupKey): Promise<EncryptedGroupKey> {
// Convert Buffer to Uint8Array for worker communication
const result = await this.getWorkerApi().encryptGroupKey({
nextGroupKeyId: nextKey.id,
nextGroupKeyData: new Uint8Array(nextKey.data),
currentGroupKeyData: new Uint8Array(currentKey.data)
})
if (result.type === 'error') {
throw new Error(`Group key encryption failed: ${result.message}`)
}
return {
id: result.id,
data: result.data
}
}

/**
* Decrypt an encrypted group key using the current group key.
*/
async decryptNextGroupKey(currentKey: GroupKey, encryptedKey: EncryptedGroupKey): Promise<GroupKey> {
// Convert Buffer to Uint8Array for worker communication
const result = await this.getWorkerApi().decryptGroupKey({
encryptedGroupKeyId: encryptedKey.id,
encryptedGroupKeyData: new Uint8Array(encryptedKey.data),
currentGroupKeyData: new Uint8Array(currentKey.data)
})
if (result.type === 'error') {
throw new Error(`Group key decryption failed: ${result.message}`)
}
return new GroupKey(result.id, Buffer.from(result.data))
}

/**
* Decrypt a stream message's content and optionally the new group key.
* This combines both operations for efficiency when processing messages.
* Note: The input content buffer is transferred to the worker and becomes unusable after this call.
*/
async decryptStreamMessage(
content: Uint8Array,
groupKey: GroupKey,
encryptedNewGroupKey?: EncryptedGroupKey
): Promise<[Uint8Array, GroupKey?]> {
// Convert Buffer to Uint8Array for worker communication
const request = {
content,
groupKeyData: new Uint8Array(groupKey.data),
newGroupKey: encryptedNewGroupKey ? {
id: encryptedNewGroupKey.id,
data: new Uint8Array(encryptedNewGroupKey.data)
} : undefined
}
const result = await this.getWorkerApi().decryptStreamMessage(
transfer(request, [content.buffer])
)
if (result.type === 'error') {
throw new StreamrClientError(`AES decryption failed: ${result.message}`, 'DECRYPT_ERROR')
}

let newGroupKey: GroupKey | undefined
if (result.newGroupKey) {
newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data))
}

return [result.content, newGroupKey]
}

destroy(): void {
if (this.workerApi !== undefined) {
this.workerApi[releaseProxy]()
this.workerApi = undefined
}
if (this.worker !== undefined) {
this.worker.terminate()
this.worker = undefined
}
}
}
56 changes: 10 additions & 46 deletions packages/sdk/src/encryption/EncryptionUtil.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { ml_kem1024 } from '@noble/post-quantum/ml-kem'
import { randomBytes } from '@noble/post-quantum/utils'
import { StreamMessageAESEncrypted } from '../protocol/StreamMessage'
import { StreamrClientError } from '../StreamrClientError'
import { GroupKey } from './GroupKey'
import { AsymmetricEncryptionType } from '@streamr/trackerless-network'
import { binaryToUtf8, createCipheriv, createDecipheriv, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils'

export const INITIALIZATION_VECTOR_LENGTH = 16
import { binaryToUtf8, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils'
import { decryptWithAES, encryptWithAES } from './aesUtils'

const INFO = Buffer.from('streamr-key-exchange')
const KEM_CIPHER_LENGTH_BYTES = 1568
const KDF_SALT_LENGTH_BYTES = 64

/**
* Asymmetric encryption utility class for RSA and ML-KEM (post-quantum) key exchange.
*
* For AES symmetric encryption of stream messages, use EncryptionService instead.
* This class only handles asymmetric encryption for key exchange operations.
*/
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
export class EncryptionUtil {
/**
Expand Down Expand Up @@ -116,7 +118,7 @@ export class EncryptionUtil {
const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt)

// Encrypt plaintext with the AES wrapping key
const aesEncryptedPlaintext = this.encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey))
const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, wrappingAESKey)

// Concatenate the deliverables into a binary package
return Buffer.concat([kemCipher, kdfSalt, aesEncryptedPlaintext])
Expand All @@ -138,44 +140,6 @@ export class EncryptionUtil {
const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt)

// Decrypt the aesEncryptedPlaintext
return this.decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey))
}

/*
* Returns a hex string without the '0x' prefix.
*/
static encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array {
const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode
const cipher = createCipheriv('aes-256-ctr', cipherKey, iv)
return Buffer.concat([iv, cipher.update(data), cipher.final()])
}

/*
* 'ciphertext' must be a hex string (without '0x' prefix), 'groupKey' must be a GroupKey. Returns a Buffer.
*/
static decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer {
const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH)
const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv)
return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()])
}

static decryptStreamMessage(streamMessage: StreamMessageAESEncrypted, groupKey: GroupKey): [Uint8Array, GroupKey?] | never {
let content: Uint8Array
try {
content = this.decryptWithAES(streamMessage.content, groupKey.data)
} catch {
throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage)
}

let newGroupKey: GroupKey | undefined = undefined
if (streamMessage.newGroupKey) {
try {
newGroupKey = groupKey.decryptNextGroupKey(streamMessage.newGroupKey)
} catch {
throw new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', streamMessage)
}
}

return [content, newGroupKey]
return Buffer.from(decryptWithAES(aesEncryptedPlaintext, wrappingAESKey))
}
}
86 changes: 86 additions & 0 deletions packages/sdk/src/encryption/EncryptionWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Web Worker for AES encryption operations.
* Offloads CPU-intensive cryptographic operations to a separate thread.
*/
import { expose, transfer } from 'comlink'
import { encryptWithAES } from './aesUtils'
import {
encryptNextGroupKey,
decryptNextGroupKey,
decryptStreamMessageContent,
AESEncryptRequest,
EncryptGroupKeyRequest,
DecryptGroupKeyRequest,
DecryptStreamMessageRequest,
AESEncryptResult,
EncryptGroupKeyResult,
DecryptGroupKeyResult,
DecryptStreamMessageResult
} from './encryptionUtils'

const workerApi = {
encrypt: async (request: AESEncryptRequest): Promise<AESEncryptResult> => {
try {
const data = encryptWithAES(request.data, request.cipherKey)
return transfer({ type: 'success', data }, [data.buffer])
} catch (err) {
return { type: 'error', message: String(err) }
}
},

encryptGroupKey: async (request: EncryptGroupKeyRequest): Promise<EncryptGroupKeyResult> => {
try {
const result = encryptNextGroupKey(
request.nextGroupKeyId,
request.nextGroupKeyData,
request.currentGroupKeyData
)
return transfer(
{ type: 'success', id: result.id, data: result.data },
[result.data.buffer]
)
} catch (err) {
return { type: 'error', message: String(err) }
}
},

decryptGroupKey: async (request: DecryptGroupKeyRequest): Promise<DecryptGroupKeyResult> => {
try {
const result = decryptNextGroupKey(
request.encryptedGroupKeyId,
request.encryptedGroupKeyData,
request.currentGroupKeyData
)
return transfer(
{ type: 'success', id: result.id, data: result.data },
[result.data.buffer]
)
} catch (err) {
return { type: 'error', message: String(err) }
}
},

decryptStreamMessage: async (request: DecryptStreamMessageRequest): Promise<DecryptStreamMessageResult> => {
try {
const result = decryptStreamMessageContent(
request.content,
request.groupKeyData,
request.newGroupKey
)
const transferables: ArrayBuffer[] = [result.content.buffer as ArrayBuffer]
if (result.newGroupKey) {
transferables.push(result.newGroupKey.data.buffer as ArrayBuffer)
}
return transfer(
{ type: 'success', content: result.content, newGroupKey: result.newGroupKey },
transferables
)
} catch (err) {
return { type: 'error', message: String(err) }
}
}
}

export type EncryptionWorkerApi = typeof workerApi

expose(workerApi)
Loading