Skip to content

StreamManager.subscribeInbound silently overwrites an existing subscriber for the same stream id #62

@nficano

Description

@nficano

StreamManager.subscribeInbound(streamId:) at Sources/ARCP/Runtime/StreamManager.swift:68 constructs a new AsyncStream continuation and writes it to inboundContinuations[streamId]. If a second caller subscribes to the same streamId while a first subscription is still active, the dictionary insertion replaces the prior continuation without finishing it or warning the caller. The first stream is then orphaned: it will never receive new chunks because dispatch(envelope:) looks up only the latest continuation, and it will never finish because the close/error branches only call finish() on whatever is currently stored. Consumers awaiting the original stream hang indefinitely, and producers have no way to discover that two callers raced for the same stream id.

Fix prompt: Decide whether a stream id is intended to have a single inbound subscriber or many. If it is single-subscriber, refuse the second subscribeInbound call by throwing ARCPError.failedPrecondition (or by returning an already-finished stream) and document the constraint on the public method. If multiple subscribers should be allowed, switch the storage to inboundContinuations: [StreamId: [AsyncStream<StreamChunkPayload>.Continuation]] and fan out chunks and finish signals to every entry. Either way, finish the old continuation before replacing it. Add tests that subscribe twice to the same stream id and assert the documented behavior, including that close/error notifications reach every active subscriber.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingseverity:lowLow severity

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions