Skip to content

Incorrect error handling in ConsumerBase.receiveAsync  #249

@jai1

Description

@jai1

Expected behavior

If the listener is unavailable or state is not ready the function should return exception.

Actual behavior

It doesn't, as seen below the return statement is missing before FutureUtil


    @Override
    public CompletableFuture<Message> receiveAsync() {

        if (listener != null) {
            FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set"));
        }

        switch (getState()) {
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
        case Failed:
        case Uninitialized:
            FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
        }

        return internalReceiveAsync();
    }

Steps to reproduce

Code Inspection

System configuration

Pulsar version: x.y

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions