Skip to content

Interruptible Reader.readNext()? #152

@rkaw92

Description

@rkaw92

Hi,
I have a use case where I expose a Pulsar topic over HTTP via Server-Sent Events. Basically, when a client connects over HTTP, I do this:

    const reader = await client.createReader({
        topic: request.params.topic,
        startMessageId: request.headers['last-event-id'] ?
                Pulsar.MessageId.deserialize(Buffer.from(request.headers['last-event-id'], 'base64')) :
                Pulsar.MessageId.earliest()
    });

Then I use a loop that reads messages as they come and sends them to the client:

    while (!clientGoneAway) {
        let message;
        message = await reader.readNext();
        reply.raw.write(formatServerSentEvent(
            message.getMessageId().serialize().toString('base64'),
            message.getData().toString('utf-8')
        ));
    }

Additionally, the server detects the close event on the HTTP requests, closes the reader and prevents further iteration:

   request.raw.once('close', async function() {
        clientGoneAway = true;
        await reader.close();
        reply.raw.end();
    })

(request.raw and reply.raw are Node.js req and res objects, respectively - they're just wrapped like this in Fastify.js)

Now, my problem is that even if I call reader.close(), the reader.readNext(); never resolves nor rejects. It's not just a Promise problem - it seems like it's keeping a thread busy, because then all other operations hang: things like fs.createReadStream, as well as creating new readers, hang forever until I completely restart the Node.js process.

I know I can use a timeout with reader.readNext(timeoutMS), but this has 2 major disadvantages:

  • It turns the reader into a kind of poller
  • It still does not vacate the thread - so it's possible to trivially saturate the thread pool by creating more readers than the pool size within the timeout period (so for example 4 readers in 1 second, when using a timeout of 1000 ms)

Is there any way to have the reader immediately abort all reads when closed?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions