diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java index 20e257c921412..feacd1ec4bff8 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java @@ -63,11 +63,11 @@ public void onWebSocketConnect(Session session) { if (service.isAuthenticationEnabled()) { try { authRole = service.getAuthenticationService().authenticateHttpRequest(request); - log.info("[{}] Authenticated WebSocket producer {} on topic {}", session.getRemoteAddress(), authRole, + log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole, topic); } catch (AuthenticationException e) { - log.warn("[{}] Failed to authenticated WebSocket producer {} on topic {}: {}", + log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}", session.getRemoteAddress(), authRole, topic, e.getMessage()); close(WebSocketError.AuthenticationError); return; @@ -75,16 +75,21 @@ public void onWebSocketConnect(Session session) { } if (service.isAuthorizationEnabled()) { - final String role = authRole; - isAuthorized(authRole).thenApply(isAuthorized -> { - if(!isAuthorized) { - log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role, + try { + if (!isAuthorized(authRole)) { + log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole, topic); close(WebSocketError.NotAuthorizedError); + return; } - return null; - }); + } catch (Exception e) { + log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}", + session.getRemoteAddress(), authRole, topic, e.getMessage()); + close(WebSocketError.UnknownError); + return; + } } + createClient(session); } @Override @@ -125,8 +130,6 @@ protected String checkAuthentication() { return null; } - protected abstract CompletableFuture isAuthorized(String authRole); - private String extractTopicName(HttpServletRequest request) { String uri = request.getRequestURI(); List parts = Splitter.on("/").splitToList(uri); @@ -143,5 +146,9 @@ private String extractTopicName(HttpServletRequest request) { return dn.toString(); } + protected abstract Boolean isAuthorized(String authRole) throws Exception; + + protected abstract void createClient(Session session); + private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class); } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 7f489f613f421..785dcc75108f5 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -87,9 +87,7 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request) { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); this.service.addConsumer(this); @@ -247,8 +245,8 @@ private ConsumerConfiguration getConsumerConfiguration() { } @Override - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole); + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); } private static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index 6ede4e2d0e45e..1053eb045f1e5 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -88,14 +88,14 @@ public void close() throws IOException { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { ProducerConfiguration conf = getProducerConfiguration(); this.producer = service.getPulsarClient().createProducer(topic, conf); this.service.addProducer(this); } catch (Exception e) { + log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(), + topic, e); close(FailedToCreateProducer, e.getMessage()); } } @@ -176,8 +176,9 @@ public long getMsgPublishedCounter() { return MSG_PUBLISHED_COUNTER_UPDATER.get(this); } - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); + @Override + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole); } private void sendAckResponse(ProducerAck response) {