From 0c1b029d0b7d3fac81ac18ceb03dc357ed4ccb92 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 8 Sep 2022 22:11:10 +0800 Subject: [PATCH] Move into future stage to catch the exception --- .../SystemTopicBasedTopicPoliciesService.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index ed69428386adb..93f97bbce07c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -380,23 +380,26 @@ private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwne } private void readMorePolicies(SystemTopicClient.Reader reader) { - reader.readNextAsync().whenComplete((msg, ex) -> { - if (ex == null) { - refreshTopicPoliciesCache(msg); - notifyListener(msg); - readMorePolicies(reader); - } else { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarClientException.AlreadyClosedException) { - log.error("Read more topic policies exception, close the read now!", ex); - cleanCacheAndCloseReader( - reader.getSystemTopic().getTopicName().getNamespaceObject(), false); - } else { - log.warn("Read more topic polices exception, read again.", ex); - readMorePolicies(reader); - } - } - }); + reader.readNextAsync() + .thenAccept(msg -> { + refreshTopicPoliciesCache(msg); + notifyListener(msg); + }) + .whenComplete((__, ex) -> { + if (ex == null) { + readMorePolicies(reader); + } else { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof PulsarClientException.AlreadyClosedException) { + log.error("Read more topic policies exception, close the read now!", ex); + cleanCacheAndCloseReader( + reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + } else { + log.warn("Read more topic polices exception, read again.", ex); + readMorePolicies(reader); + } + } + }); } private void refreshTopicPoliciesCache(Message msg) {