From d209098755aa77fff065afaf2c8ebca93ab0335a Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 7 Feb 2026 18:46:56 +0800 Subject: [PATCH 1/3] [fix][broker] Fix incomplete futures in topic property update/delete methods Two methods in PersistentTopicsBase throw RestException inside thenAccept callbacks, leaving the returned CompletableFuture incomplete when the topic doesn't exist. This causes callers to hang indefinitely. Changes: - Replace throw new RestException with future.completeExceptionally( new WebApplicationException(...)) in both methods - Add explicit return after exceptional completion to prevent further execution - Add .exceptionally() handler to ensure future completion on any unexpected errors Affected methods: - internalUpdateNonPartitionedTopicProperties - internalRemoveNonPartitionedTopicProperties --- .../admin/impl/PersistentTopicsBase.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 865212f48e59c..a59326b333c81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -660,8 +660,10 @@ private CompletableFuture internalUpdateNonPartitionedTopicProperties(Map< pulsar().getBrokerService().getTopicIfExists(topicName.toString()) .thenAccept(opt -> { if (!opt.isPresent()) { - throw new RestException(Status.NOT_FOUND, - getTopicNotFoundErrorMessage(topicName.toString())); + future.completeExceptionally( + new WebApplicationException(getTopicNotFoundErrorMessage(topicName.toString()), + Status.NOT_FOUND)); + return; } ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() { @@ -681,6 +683,9 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) future.completeExceptionally(exception); } }, null); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; }); return future; } @@ -717,8 +722,10 @@ private CompletableFuture internalRemoveNonPartitionedTopicProperties(Stri pulsar().getBrokerService().getTopicIfExists(topicName.toString()) .thenAccept(opt -> { if (!opt.isPresent()) { - throw new RestException(Status.NOT_FOUND, - getTopicNotFoundErrorMessage(topicName.toString())); + future.completeExceptionally( + new WebApplicationException(getTopicNotFoundErrorMessage(topicName.toString()), + Status.NOT_FOUND)); + return; } ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() { @@ -733,6 +740,9 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) future.completeExceptionally(exception); } }, null); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; }); return future; } From 37aa482e13f388d5827858053703758924dd68a0 Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 7 Feb 2026 19:00:51 +0800 Subject: [PATCH 2/3] [test][broker] Add test for topic property operations on non-existent topic Add test to verify that updateProperties and removeProperties operations on non-existent topics correctly return 404 Not Found instead of hanging. This test validates the fix for incomplete futures in topic property update/delete methods. --- .../pulsar/broker/admin/AdminApi2Test.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index eeac3e1c5e24d..a83a700916420 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1412,6 +1412,33 @@ public void testUpdateNonPartitionedTopicProperties() throws Exception { Assert.assertEquals(properties.get("key2"), "value2"); } + @Test + public void testUpdatePropertiesOnNonExistentTopic() throws Exception { + final String namespace = newUniqueName(defaultTenant + "/ns2"); + final String topicName = "persistent://" + namespace + "/testUpdatePropertiesOnNonExistentTopic"; + admin.namespaces().createNamespace(namespace, 20); + + // Test updateProperties on non-existent topic should return 404 Not Found + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + try { + admin.topics().updateProperties(topicName, topicProperties); + Assert.fail("Should have thrown an exception for non-existent topic"); + } catch (PulsarAdminException.NotFoundException e) { + // Expected + assertTrue(e.getMessage().contains("Topic not found") || e.getMessage().contains("not found")); + } + + // Test removeProperties on non-existent topic should return 404 Not Found + try { + admin.topics().removeProperties(topicName, "key1"); + Assert.fail("Should have thrown an exception for non-existent topic"); + } catch (PulsarAdminException.NotFoundException e) { + // Expected + assertTrue(e.getMessage().contains("Topic not found") || e.getMessage().contains("not found")); + } + } + @Test public void testNonPersistentTopics() throws Exception { final String namespace = newUniqueName(defaultTenant + "/ns2"); From d18091e489db3423ec6f8f326b4cd0fe246df109 Mon Sep 17 00:00:00 2001 From: coderzc Date: Mon, 9 Feb 2026 01:51:31 +0800 Subject: [PATCH 3/3] fix test --- .../pulsar/broker/admin/AdminApi2Test.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index a83a700916420..b1c11e208d477 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1423,19 +1423,21 @@ public void testUpdatePropertiesOnNonExistentTopic() throws Exception { topicProperties.put("key1", "value1"); try { admin.topics().updateProperties(topicName, topicProperties); - Assert.fail("Should have thrown an exception for non-existent topic"); - } catch (PulsarAdminException.NotFoundException e) { - // Expected - assertTrue(e.getMessage().contains("Topic not found") || e.getMessage().contains("not found")); + fail("Should have thrown an exception for non-existent topic"); + } catch (Exception e) { + Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> { + throw e; + }); } // Test removeProperties on non-existent topic should return 404 Not Found try { admin.topics().removeProperties(topicName, "key1"); - Assert.fail("Should have thrown an exception for non-existent topic"); + fail("Should have thrown an exception for non-existent topic"); } catch (PulsarAdminException.NotFoundException e) { - // Expected - assertTrue(e.getMessage().contains("Topic not found") || e.getMessage().contains("not found")); + Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> { + throw e; + }); } }