From bbebf825da9d3e6231c36319be42b9bb1858c7fe Mon Sep 17 00:00:00 2001 From: Yuki Shiga Date: Mon, 5 Jun 2017 16:10:03 +0900 Subject: [PATCH 1/2] Made websocket proxy not make a consumer/producer when authorization is failed. --- .../websocket/AbstractWebSocketHandler.java | 16 +++++++++++----- .../yahoo/pulsar/websocket/ConsumerHandler.java | 4 +--- .../yahoo/pulsar/websocket/ProducerHandler.java | 7 ++++--- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java index 20e257c921412..d0417d58875fa 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java @@ -63,11 +63,11 @@ public void onWebSocketConnect(Session session) { if (service.isAuthenticationEnabled()) { try { authRole = service.getAuthenticationService().authenticateHttpRequest(request); - log.info("[{}] Authenticated WebSocket producer {} on topic {}", session.getRemoteAddress(), authRole, + log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole, topic); } catch (AuthenticationException e) { - log.warn("[{}] Failed to authenticated WebSocket producer {} on topic {}: {}", + log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}", session.getRemoteAddress(), authRole, topic, e.getMessage()); close(WebSocketError.AuthenticationError); return; @@ -77,14 +77,18 @@ public void onWebSocketConnect(Session session) { if (service.isAuthorizationEnabled()) { final String role = authRole; isAuthorized(authRole).thenApply(isAuthorized -> { - if(!isAuthorized) { + if(isAuthorized) { + createClient(session); + } else { log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role, topic); close(WebSocketError.NotAuthorizedError); } return null; }); + return; } + createClient(session); } @Override @@ -125,8 +129,6 @@ protected String checkAuthentication() { return null; } - protected abstract CompletableFuture isAuthorized(String authRole); - private String extractTopicName(HttpServletRequest request) { String uri = request.getRequestURI(); List parts = Splitter.on("/").splitToList(uri); @@ -143,5 +145,9 @@ private String extractTopicName(HttpServletRequest request) { return dn.toString(); } + protected abstract CompletableFuture isAuthorized(String authRole); + + protected abstract void createClient(Session session); + private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class); } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 7f489f613f421..8fabf96aff834 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -87,9 +87,7 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request) { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); this.service.addConsumer(this); diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index 6ede4e2d0e45e..9cf93649214fe 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -88,14 +88,14 @@ public void close() throws IOException { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { ProducerConfiguration conf = getProducerConfiguration(); this.producer = service.getPulsarClient().createProducer(topic, conf); this.service.addProducer(this); } catch (Exception e) { + log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(), + topic, e); close(FailedToCreateProducer, e.getMessage()); } } @@ -176,6 +176,7 @@ public long getMsgPublishedCounter() { return MSG_PUBLISHED_COUNTER_UPDATER.get(this); } + @Override protected CompletableFuture isAuthorized(String authRole) { return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); } From 47766cddaa903045741a1e26600fb94c78636e89 Mon Sep 17 00:00:00 2001 From: Yuki Shiga Date: Fri, 9 Jun 2017 14:57:14 +0900 Subject: [PATCH 2/2] Made isAuthorized() in WebSocketHandler syncronized --- .../websocket/AbstractWebSocketHandler.java | 21 ++++++++++--------- .../pulsar/websocket/ConsumerHandler.java | 4 ++-- .../pulsar/websocket/ProducerHandler.java | 4 ++-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java index d0417d58875fa..feacd1ec4bff8 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java @@ -75,18 +75,19 @@ public void onWebSocketConnect(Session session) { } if (service.isAuthorizationEnabled()) { - final String role = authRole; - isAuthorized(authRole).thenApply(isAuthorized -> { - if(isAuthorized) { - createClient(session); - } else { - log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role, + try { + if (!isAuthorized(authRole)) { + log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole, topic); close(WebSocketError.NotAuthorizedError); + return; } - return null; - }); - return; + } catch (Exception e) { + log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}", + session.getRemoteAddress(), authRole, topic, e.getMessage()); + close(WebSocketError.UnknownError); + return; + } } createClient(session); } @@ -145,7 +146,7 @@ private String extractTopicName(HttpServletRequest request) { return dn.toString(); } - protected abstract CompletableFuture isAuthorized(String authRole); + protected abstract Boolean isAuthorized(String authRole) throws Exception; protected abstract void createClient(Session session); diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 8fabf96aff834..785dcc75108f5 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -245,8 +245,8 @@ private ConsumerConfiguration getConsumerConfiguration() { } @Override - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole); + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); } private static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index 9cf93649214fe..1053eb045f1e5 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -177,8 +177,8 @@ public long getMsgPublishedCounter() { } @Override - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole); } private void sendAckResponse(ProducerAck response) {