diff --git a/.changeset/cool-peaches-flash.md b/.changeset/cool-peaches-flash.md new file mode 100644 index 00000000..72b07b6f --- /dev/null +++ b/.changeset/cool-peaches-flash.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +Hold lock on all ffi events diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index be12ea96..66ddda39 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -155,13 +155,13 @@ export type DataPublishOptions = { export class LocalParticipant extends Participant { private rpcHandlers: Map Promise> = new Map(); - private roomEventLock: Mutex; + private ffiEventLock: Mutex; trackPublications: Map = new Map(); - constructor(info: OwnedParticipant, roomEventLock: Mutex) { + constructor(info: OwnedParticipant, ffiEventLock: Mutex) { super(info); - this.roomEventLock = roomEventLock; + this.ffiEventLock = ffiEventLock; } async publishData(data: Uint8Array, options: DataPublishOptions) { @@ -662,7 +662,7 @@ export class LocalParticipant extends Participant { options: options, }); - const unlock = await this.roomEventLock.lock(); + const unlock = await this.ffiEventLock.lock(); const res = FfiClient.instance.request({ message: { case: 'publishTrack', value: req }, @@ -690,7 +690,7 @@ export class LocalParticipant extends Participant { } async unpublishTrack(trackSid: string, stopOnUnpublish?: boolean) { - const unlock = await this.roomEventLock.lock(); + const unlock = await this.ffiEventLock.lock(); try { const req = new UnpublishTrackRequest({ localParticipantHandle: this.ffi_handle.handle, diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 902a94ed..d2c318de 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -87,7 +87,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter * the local participant to acquire the lock while doing state updates related to FFI events * before processing the next events */ - private roomEventLock = new Mutex(); + private ffiEventLock = new Mutex(); private byteStreamControllers = new Map>(); private textStreamControllers = new Map>(); @@ -221,7 +221,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.connectionState = ConnectionState.CONN_CONNECTED; this.localParticipant = new LocalParticipant( cb.message.value.localParticipant!, - this.roomEventLock, + this.ffiEventLock, ); for (const pt of cb.message.value.participants) { @@ -302,18 +302,23 @@ export class Room extends (EventEmitter as new () => TypedEmitter } private onFfiEvent = async (ffiEvent: FfiEvent) => { - if (!this.localParticipant || !this.ffiHandle || !this.info) { - this.preConnectEvents.push(ffiEvent); - return; - } + const unlock = await this.ffiEventLock.lock(); + try { + if (!this.localParticipant || !this.ffiHandle || !this.info) { + this.preConnectEvents.push(ffiEvent); + return; + } - // process preConnectEvents if we received the connectCallback after the events were queued - for (const ev of this.preConnectEvents) { - await this.processFfiEvent(ev); - } - this.preConnectEvents = []; + // process preConnectEvents if we received the connectCallback after the events were queued + for (const ev of this.preConnectEvents) { + await this.processFfiEvent(ev); + } + this.preConnectEvents = []; - await this.processFfiEvent(ffiEvent); + await this.processFfiEvent(ffiEvent); + } finally { + unlock(); + } }; private processFfiEvent = async (ffiEvent: FfiEvent) => { @@ -321,302 +326,294 @@ export class Room extends (EventEmitter as new () => TypedEmitter throw new Error('processFfiEvent called before connect'); } - const unlock = await this.roomEventLock.lock(); - - try { - if (ffiEvent.message.case == 'rpcMethodInvocation') { - if ( - ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle - ) { - this.localParticipant.handleRpcMethodInvocation( - ffiEvent.message.value.invocationId!, - ffiEvent.message.value.method!, - ffiEvent.message.value.requestId!, - ffiEvent.message.value.callerIdentity!, - ffiEvent.message.value.payload!, - ffiEvent.message.value.responseTimeoutMs!, - ); - } - return; - } else if ( - ffiEvent.message.case != 'roomEvent' || - ffiEvent.message.value.roomHandle != this.ffiHandle.handle + if (ffiEvent.message.case == 'rpcMethodInvocation') { + if ( + ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle ) { - return; + this.localParticipant.handleRpcMethodInvocation( + ffiEvent.message.value.invocationId!, + ffiEvent.message.value.method!, + ffiEvent.message.value.requestId!, + ffiEvent.message.value.callerIdentity!, + ffiEvent.message.value.payload!, + ffiEvent.message.value.responseTimeoutMs!, + ); } + return; + } else if ( + ffiEvent.message.case != 'roomEvent' || + ffiEvent.message.value.roomHandle != this.ffiHandle.handle + ) { + return; + } - const ev = ffiEvent.message.value.message; - if (process.env.LIVEKIT_DEBUG_LOG_ROOM_EVENTS) { - console.log('Room event:', ev); + const ev = ffiEvent.message.value.message; + if (process.env.LIVEKIT_DEBUG_LOG_ROOM_EVENTS) { + console.log('Room event:', ev); + } + if (ev.case == 'participantConnected') { + const participant = this.createRemoteParticipant(ev.value.info!); + this.remoteParticipants.set(participant.identity!, participant); + this.emit(RoomEvent.ParticipantConnected, participant); + } else if (ev.case == 'participantDisconnected') { + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + if (participant) { + this.remoteParticipants.delete(participant.identity); + participant.info.disconnectReason = ev.value.disconnectReason; + this.emit(RoomEvent.ParticipantDisconnected, participant); + } else { + console.log(`RoomEvent.ParticipantDisconnected: Could not find participant`); } - if (ev.case == 'participantConnected') { - const participant = this.createRemoteParticipant(ev.value.info!); - this.remoteParticipants.set(participant.identity!, participant); - this.emit(RoomEvent.ParticipantConnected, participant); - } else if (ev.case == 'participantDisconnected') { - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - if (participant) { - this.remoteParticipants.delete(participant.identity); - participant.info.disconnectReason = ev.value.disconnectReason; - this.emit(RoomEvent.ParticipantDisconnected, participant); - } else { - console.log(`RoomEvent.ParticipantDisconnected: Could not find participant`); - } - } else if (ev.case == 'localTrackPublished') { - const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); - this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); - } else if (ev.case == 'localTrackUnpublished') { - const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); - this.localParticipant.trackPublications.delete(ev.value.publicationSid!); - this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); - } else if (ev.case == 'localTrackSubscribed') { - const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); - if (publication) { - publication.resolveFirstSubscription(); - this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!); - } else { - console.warn( - `RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`, - ); - } - } else if (ev.case == 'trackPublished') { - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - const publication = new RemoteTrackPublication(ev.value.publication!); - if (participant) { - participant.trackPublications.set(publication.sid!, publication); - } else { - console.warn( - `RoomEvent.TrackPublished: Could not find participant: ${ev.value.participantIdentity}`, - ); - } - this.emit(RoomEvent.TrackPublished, publication, participant!); - } else if (ev.case == 'trackUnpublished') { - const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); - const publication = participant.trackPublications.get(ev.value.publicationSid!); - participant.trackPublications.delete(ev.value.publicationSid!); - if (publication) { - this.emit(RoomEvent.TrackUnpublished, publication, participant); - } else { - console.warn(`RoomEvent.TrackUnpublished: Could not find publication`); + } else if (ev.case == 'localTrackPublished') { + const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); + this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); + } else if (ev.case == 'localTrackUnpublished') { + const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); + this.localParticipant.trackPublications.delete(ev.value.publicationSid!); + this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); + } else if (ev.case == 'localTrackSubscribed') { + const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); + if (publication) { + publication.resolveFirstSubscription(); + this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!); + } else { + console.warn(`RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`); + } + } else if (ev.case == 'trackPublished') { + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + const publication = new RemoteTrackPublication(ev.value.publication!); + if (participant) { + participant.trackPublications.set(publication.sid!, publication); + } else { + console.warn( + `RoomEvent.TrackPublished: Could not find participant: ${ev.value.participantIdentity}`, + ); + } + this.emit(RoomEvent.TrackPublished, publication, participant!); + } else if (ev.case == 'trackUnpublished') { + const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); + const publication = participant.trackPublications.get(ev.value.publicationSid!); + participant.trackPublications.delete(ev.value.publicationSid!); + if (publication) { + this.emit(RoomEvent.TrackUnpublished, publication, participant); + } else { + console.warn(`RoomEvent.TrackUnpublished: Could not find publication`); + } + } else if (ev.case == 'trackSubscribed') { + const ownedTrack = ev.value.track!; + const trackInfo = ownedTrack.info!; + try { + const { participant, publication } = this.requirePublicationOfRemoteParticipant( + ev.value.participantIdentity!, + trackInfo.sid!, + ); + publication.subscribed = true; + if (trackInfo.kind == TrackKind.KIND_VIDEO) { + publication.track = new RemoteVideoTrack(ownedTrack); + } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { + publication.track = new RemoteAudioTrack(ownedTrack); } - } else if (ev.case == 'trackSubscribed') { - const ownedTrack = ev.value.track!; - const trackInfo = ownedTrack.info!; - try { - const { participant, publication } = this.requirePublicationOfRemoteParticipant( - ev.value.participantIdentity!, - trackInfo.sid!, - ); - publication.subscribed = true; - if (trackInfo.kind == TrackKind.KIND_VIDEO) { - publication.track = new RemoteVideoTrack(ownedTrack); - } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { - publication.track = new RemoteAudioTrack(ownedTrack); - } - this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackSubscribed: ${e.message}`); - } - } else if (ev.case == 'trackUnsubscribed') { - try { - const { participant, publication } = this.requirePublicationOfRemoteParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - const track = publication.track!; - publication.track = undefined; - publication.subscribed = false; - this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackUnsubscribed: ${e.message}`); - } - } else if (ev.case == 'trackSubscriptionFailed') { - try { - const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); - this.emit( - RoomEvent.TrackSubscriptionFailed, - ev.value.trackSid!, - participant, - ev.value.error, - ); - } catch (e: any) { - console.warn(`RoomEvent.TrackSubscriptionFailed: ${e.message}`); - } - } else if (ev.case == 'trackMuted') { - try { - const { participant, publication } = this.requirePublicationOfParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - publication.info!.muted = true; - if (publication.track) { - publication.track.info!.muted = true; - } - this.emit(RoomEvent.TrackMuted, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackMuted: ${e.message}`); - } - } else if (ev.case == 'trackUnmuted') { - try { - const { participant, publication } = this.requirePublicationOfParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - publication.info!.muted = false; - if (publication.track) { - publication.track.info!.muted = false; - } - this.emit(RoomEvent.TrackUnmuted, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackUnmuted: ${e.message}`); - } - } else if (ev.case == 'activeSpeakersChanged') { - try { - const activeSpeakers = ev.value.participantIdentities.map((identity) => - this.requireParticipantByIdentity(identity), - ); - this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); - } catch (e: any) { - console.warn(`RoomEvent.ActiveSpeakersChanged: ${e.message}`); - } - } else if (ev.case == 'roomMetadataChanged') { - this.info.metadata = ev.value.metadata ?? ''; - this.emit(RoomEvent.RoomMetadataChanged, this.info.metadata); - } else if (ev.case == 'participantMetadataChanged') { - try { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.metadata = ev.value.metadata; - this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantMetadataChanged: ${e.message}`); + this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); + } catch (e: any) { + console.warn(`RoomEvent.TrackSubscribed: ${e.message}`); + } + } else if (ev.case == 'trackUnsubscribed') { + try { + const { participant, publication } = this.requirePublicationOfRemoteParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + const track = publication.track!; + publication.track = undefined; + publication.subscribed = false; + this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); + } catch (e: any) { + console.warn(`RoomEvent.TrackUnsubscribed: ${e.message}`); + } + } else if (ev.case == 'trackSubscriptionFailed') { + try { + const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); + this.emit( + RoomEvent.TrackSubscriptionFailed, + ev.value.trackSid!, + participant, + ev.value.error, + ); + } catch (e: any) { + console.warn(`RoomEvent.TrackSubscriptionFailed: ${e.message}`); + } + } else if (ev.case == 'trackMuted') { + try { + const { participant, publication } = this.requirePublicationOfParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + publication.info!.muted = true; + if (publication.track) { + publication.track.info!.muted = true; } - } else if (ev.case == 'participantNameChanged') { - try { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.name = ev.value.name; - this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantNameChanged: ${e.message}`); + this.emit(RoomEvent.TrackMuted, publication, participant); + } catch (e: any) { + console.warn(`RoomEvent.TrackMuted: ${e.message}`); + } + } else if (ev.case == 'trackUnmuted') { + try { + const { participant, publication } = this.requirePublicationOfParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + publication.info!.muted = false; + if (publication.track) { + publication.track.info!.muted = false; } - } else if (ev.case == 'participantAttributesChanged') { - try { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.attributes = ev.value.attributes.reduce( + this.emit(RoomEvent.TrackUnmuted, publication, participant); + } catch (e: any) { + console.warn(`RoomEvent.TrackUnmuted: ${e.message}`); + } + } else if (ev.case == 'activeSpeakersChanged') { + try { + const activeSpeakers = ev.value.participantIdentities.map((identity) => + this.requireParticipantByIdentity(identity), + ); + this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); + } catch (e: any) { + console.warn(`RoomEvent.ActiveSpeakersChanged: ${e.message}`); + } + } else if (ev.case == 'roomMetadataChanged') { + this.info.metadata = ev.value.metadata ?? ''; + this.emit(RoomEvent.RoomMetadataChanged, this.info.metadata); + } else if (ev.case == 'participantMetadataChanged') { + try { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.metadata = ev.value.metadata; + this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); + } catch (e: any) { + console.warn(`RoomEvent.ParticipantMetadataChanged: ${e.message}`); + } + } else if (ev.case == 'participantNameChanged') { + try { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.name = ev.value.name; + this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); + } catch (e: any) { + console.warn(`RoomEvent.ParticipantNameChanged: ${e.message}`); + } + } else if (ev.case == 'participantAttributesChanged') { + try { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.attributes = ev.value.attributes.reduce( + (acc, value) => { + acc[value.key!] = value.value!; + return acc; + }, + {} as Record, + ); + if (Object.keys(ev.value.changedAttributes).length > 0) { + const changedAttributes = ev.value.changedAttributes.reduce( (acc, value) => { acc[value.key!] = value.value!; return acc; }, {} as Record, ); - if (Object.keys(ev.value.changedAttributes).length > 0) { - const changedAttributes = ev.value.changedAttributes.reduce( - (acc, value) => { - acc[value.key!] = value.value!; - return acc; - }, - {} as Record, - ); - this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); - } - } catch (e: any) { - console.warn(`RoomEvent.ParticipantAttributesChanged: ${e.message}`); - } - } else if (ev.case == 'connectionQualityChanged') { - try { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); - } catch (e: any) { - console.warn(`RoomEvent.ConnectionQualityChanged: ${e.message}`); - } - } else if (ev.case == 'chatMessage') { - const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); - const { id, message: messageText, timestamp, editTimestamp, generated } = ev.value.message!; - const message: ChatMessage = { - id: id!, - message: messageText!, - timestamp: Number(timestamp), - editTimestamp: Number(editTimestamp), - generated, - }; - this.emit(RoomEvent.ChatMessage, message, participant); - } else if (ev.case == 'dataPacketReceived') { - // Can be undefined if the data is sent from a Server SDK - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - const dataPacket = ev.value.value; - switch (dataPacket.case) { - case 'user': - const buffer = FfiClient.instance.copyBuffer( - dataPacket.value.data!.data!.dataPtr!, - Number(dataPacket.value.data!.data!.dataLen), - ); - new FfiHandle(dataPacket.value.data!.handle!.id!).dispose(); - this.emit( - RoomEvent.DataReceived, - buffer, - participant, - ev.value.kind, - dataPacket.value.topic, - ); - break; - case 'sipDtmf': - const { code, digit } = dataPacket.value; - this.emit(RoomEvent.DtmfReceived, code!, digit!, participant!); - break; - default: - break; - } - } else if (ev.case == 'e2eeStateChanged') { - if (ev.value.state == EncryptionState.INTERNAL_ERROR) { - // throw generic error until Rust SDK is updated to supply the error alongside INTERNAL_ERROR - this.emit(RoomEvent.EncryptionError, new Error('internal server error')); + this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); } - } else if (ev.case == 'connectionStateChanged') { - this.connectionState = ev.value.state!; - this.emit(RoomEvent.ConnectionStateChanged, this.connectionState); - /*} else if (ev.case == 'connected') { - this.emit(RoomEvent.Connected);*/ - } else if (ev.case == 'disconnected') { - this.emit(RoomEvent.Disconnected, ev.value.reason!); - } else if (ev.case == 'reconnecting') { - this.emit(RoomEvent.Reconnecting); - } else if (ev.case == 'reconnected') { - this.emit(RoomEvent.Reconnected); - } else if (ev.case == 'roomSidChanged') { - this.emit(RoomEvent.RoomSidChanged, ev.value.sid!); - } else if (ev.case === 'streamHeaderReceived' && ev.value.header) { - this.handleStreamHeader(ev.value.header, ev.value.participantIdentity!); - } else if (ev.case === 'streamChunkReceived' && ev.value.chunk) { - this.handleStreamChunk(ev.value.chunk); - } else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) { - this.handleStreamTrailer(ev.value.trailer); - } else if (ev.case === 'roomUpdated') { - this.info = ev.value; - this.emit(RoomEvent.RoomUpdated); - } else if (ev.case === 'moved') { - this.info = ev.value; - this.emit(RoomEvent.Moved); - } else if (ev.case === 'participantsUpdated') { - for (const info of ev.value.participants) { - const participant = this.retrieveParticipantByIdentity(info.identity!); - if (participant) { - participant.info = info; - } - } - } else if (ev.case === 'participantEncryptionStatusChanged') { - try { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + } catch (e: any) { + console.warn(`RoomEvent.ParticipantAttributesChanged: ${e.message}`); + } + } else if (ev.case == 'connectionQualityChanged') { + try { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); + } catch (e: any) { + console.warn(`RoomEvent.ConnectionQualityChanged: ${e.message}`); + } + } else if (ev.case == 'chatMessage') { + const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); + const { id, message: messageText, timestamp, editTimestamp, generated } = ev.value.message!; + const message: ChatMessage = { + id: id!, + message: messageText!, + timestamp: Number(timestamp), + editTimestamp: Number(editTimestamp), + generated, + }; + this.emit(RoomEvent.ChatMessage, message, participant); + } else if (ev.case == 'dataPacketReceived') { + // Can be undefined if the data is sent from a Server SDK + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + const dataPacket = ev.value.value; + switch (dataPacket.case) { + case 'user': + const buffer = FfiClient.instance.copyBuffer( + dataPacket.value.data!.data!.dataPtr!, + Number(dataPacket.value.data!.data!.dataLen), + ); + new FfiHandle(dataPacket.value.data!.handle!.id!).dispose(); this.emit( - RoomEvent.ParticipantEncryptionStatusChanged, - !!ev.value.isEncrypted, + RoomEvent.DataReceived, + buffer, participant, + ev.value.kind, + dataPacket.value.topic, ); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${e.message}`); + break; + case 'sipDtmf': + const { code, digit } = dataPacket.value; + this.emit(RoomEvent.DtmfReceived, code!, digit!, participant!); + break; + default: + break; + } + } else if (ev.case == 'e2eeStateChanged') { + if (ev.value.state == EncryptionState.INTERNAL_ERROR) { + // throw generic error until Rust SDK is updated to supply the error alongside INTERNAL_ERROR + this.emit(RoomEvent.EncryptionError, new Error('internal server error')); + } + } else if (ev.case == 'connectionStateChanged') { + this.connectionState = ev.value.state!; + this.emit(RoomEvent.ConnectionStateChanged, this.connectionState); + /*} else if (ev.case == 'connected') { + this.emit(RoomEvent.Connected);*/ + } else if (ev.case == 'disconnected') { + this.emit(RoomEvent.Disconnected, ev.value.reason!); + } else if (ev.case == 'reconnecting') { + this.emit(RoomEvent.Reconnecting); + } else if (ev.case == 'reconnected') { + this.emit(RoomEvent.Reconnected); + } else if (ev.case == 'roomSidChanged') { + this.emit(RoomEvent.RoomSidChanged, ev.value.sid!); + } else if (ev.case === 'streamHeaderReceived' && ev.value.header) { + this.handleStreamHeader(ev.value.header, ev.value.participantIdentity!); + } else if (ev.case === 'streamChunkReceived' && ev.value.chunk) { + this.handleStreamChunk(ev.value.chunk); + } else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) { + this.handleStreamTrailer(ev.value.trailer); + } else if (ev.case === 'roomUpdated') { + this.info = ev.value; + this.emit(RoomEvent.RoomUpdated); + } else if (ev.case === 'moved') { + this.info = ev.value; + this.emit(RoomEvent.Moved); + } else if (ev.case === 'participantsUpdated') { + for (const info of ev.value.participants) { + const participant = this.retrieveParticipantByIdentity(info.identity!); + if (participant) { + participant.info = info; } } - } finally { - unlock(); + } else if (ev.case === 'participantEncryptionStatusChanged') { + try { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + this.emit( + RoomEvent.ParticipantEncryptionStatusChanged, + !!ev.value.isEncrypted, + participant, + ); + } catch (e: any) { + console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${e.message}`); + } } };