diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java index a1831955a..e828c3472 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java @@ -62,7 +62,7 @@ public static Publisher create(InetSocketAddres } public static Publisher create(URI uri, EventLoopGroup eventLoopGroup) { - return s -> { + return subscriber -> { WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()); @@ -86,21 +86,21 @@ protected void initChannel(SocketChannel ch) throws Exception { }).connect(uri.getHost(), uri.getPort()); connect.addListener(connectFuture -> { + subscriber.onSubscribe(EmptySubscription.INSTANCE); if (connectFuture.isSuccess()) { final Channel ch = connect.channel(); clientHandler .getHandshakePromise() .addListener(handshakeFuture -> { - s.onSubscribe(EmptySubscription.INSTANCE); if (handshakeFuture.isSuccess()) { - s.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); - s.onComplete(); + subscriber.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); + subscriber.onComplete(); } else { - s.onError(handshakeFuture.cause()); + subscriber.onError(handshakeFuture.cause()); } }); } else { - s.onError(connectFuture.cause()); + subscriber.onError(connectFuture.cause()); } }); };