From 68977b7a43772fcb75823539ea0c0375e8743bcf Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Mon, 27 Jun 2016 15:25:51 -0700 Subject: [PATCH] Fix subscription for Websocket transport **Problem** The websocket DuplexConnection doesn't initialize the Publisher chain in every cases. **Solution** Move the initialization code `onSubscribe` up, and also rename the subscriber `subscriber` instead of `s` to avoid confusion with the `Subscription`. --- .../client/ClientWebSocketDuplexConnection.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java index a1831955a..e828c3472 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java @@ -62,7 +62,7 @@ public static Publisher create(InetSocketAddres } public static Publisher create(URI uri, EventLoopGroup eventLoopGroup) { - return s -> { + return subscriber -> { WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()); @@ -86,21 +86,21 @@ protected void initChannel(SocketChannel ch) throws Exception { }).connect(uri.getHost(), uri.getPort()); connect.addListener(connectFuture -> { + subscriber.onSubscribe(EmptySubscription.INSTANCE); if (connectFuture.isSuccess()) { final Channel ch = connect.channel(); clientHandler .getHandshakePromise() .addListener(handshakeFuture -> { - s.onSubscribe(EmptySubscription.INSTANCE); if (handshakeFuture.isSuccess()) { - s.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); - s.onComplete(); + subscriber.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); + subscriber.onComplete(); } else { - s.onError(handshakeFuture.cause()); + subscriber.onError(handshakeFuture.cause()); } }); } else { - s.onError(connectFuture.cause()); + subscriber.onError(connectFuture.cause()); } }); };