Skip to content

Operator called default onErrorDropped on rsocket dispose #1018

@linux-china

Description

@linux-china

After introduce RSocket Java SDK from 1.1.0 to 1.1.1, a simple server will throw exception after client disconnected.

public class RSocketResponderApp {
    public static void main(String[] args) throws Exception {
        CloseableChannel closeableChannel = RSocketServer.create()
                .acceptor((setup, sendingSocket) -> {
                    return Mono.just(new RSocket() {
                        @Override
                        public Mono<Payload> requestResponse(Payload payload) {
                            return Mono.just(payload);
                        }
                    });
                })
                .bind(TcpServerTransport.create("0.0.0.0", 42252))
                .block();
        System.out.println("RSocket responder is listening on 42252!");
        closeableChannel.onClose().block();
    }
}

Exception on server side by rsc --request --route=xxxx --data=Foo --debug tcp://localhost:42252

15:42:58.383 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
	at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550)
	at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67)
	at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146)
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.onComplete(SinkEmptyMulticast.java:227)
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70)
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:43)
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
	at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1186)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:773)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:749)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Wrapped by: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed

On the requester's side, after rsocket.dispose() and above exception will be thrown.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions