From 425bc491071717c6a5680643de89bb131d6beb21 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Thu, 9 Jun 2022 18:06:31 -0700 Subject: [PATCH 1/2] [improve][tests] improved flaky test runs - improved PulsarFunctionTlsTests by reordering tearDown() logic - improved ManagedLedgerFactoryImpl.shutdown() by closing cacheEviction threads early - improved TestPulsarConnector memory consumption by removing unnecessary spy() - improved PulsarFunctionsTest run by using receive() instead of receive(30, TimeUnit.SECONDS); --- .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 6 +++--- .../pulsar/functions/worker/PulsarFunctionTlsTest.java | 7 ++++--- .../java/org/apache/pulsar/io/PulsarFunctionTlsTest.java | 4 ++-- .../org/apache/pulsar/sql/presto/TestPulsarConnector.java | 4 ++-- .../tests/integration/functions/PulsarFunctionsTest.java | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 48e85423e8486..1a12f9da49606 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -262,7 +262,7 @@ private void cacheEvictionTask() { double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001); long waitTimeMillis = (long) (1000 / evictionFrequency); - while (true) { + while (!closed) { try { doCacheEviction(); @@ -514,6 +514,7 @@ public CompletableFuture shutdownAsync() throws ManagedLedgerException { statsTask.cancel(true); flushCursorsTask.cancel(true); + cacheEvictionExecutor.shutdownNow(); List ledgerNames = new ArrayList<>(this.ledgers.keySet()); List> futures = new ArrayList<>(ledgerNames.size()); @@ -594,7 +595,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); } })); - cacheEvictionExecutor.shutdownNow(); entryCacheManager.clear(); return FutureUtil.waitForAll(futures); } @@ -608,6 +608,7 @@ public void shutdown() throws InterruptedException, ManagedLedgerException { statsTask.cancel(true); flushCursorsTask.cancel(true); + cacheEvictionExecutor.shutdownNow(); // take a snapshot of ledgers currently in the map to prevent race conditions List> ledgers = new ArrayList<>(this.ledgers.values()); @@ -651,7 +652,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { } scheduledExecutor.shutdownNow(); - cacheEvictionExecutor.shutdownNow(); entryCacheManager.clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index fcba675a4b7d4..227088019f029 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -178,12 +178,13 @@ void setup() throws Exception { void tearDown() throws Exception { try { for (int i = 0; i < BROKER_COUNT; i++) { - if (pulsarServices[i] != null) { - pulsarServices[i].close(); - } if (pulsarAdmins[i] != null) { pulsarAdmins[i].close(); } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + } + } bkEnsemble.stop(); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 61f68938f7427..2cb0e62f8e3c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -174,9 +174,9 @@ void shutdown() throws Exception { log.info("--- Shutting down ---"); try { functionAdmin.close(); - bkEnsemble.stop(); - workerServer.stop(); functionsWorkerService.stop(); + workerServer.stop(); + bkEnsemble.stop(); } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index fc13647e905a8..a03eb3f5a7231 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -707,10 +707,10 @@ public Long answer(InvocationOnMock invocationOnMock) throws Throwable { when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory); for (Map.Entry split : splits.entrySet()) { - PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor( + PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor( topicsToColumnHandles.get(split.getKey()), split.getValue(), pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(), - new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory)); + new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory); this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 9a6a26be3da4d..7a5c75b057ea1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -1584,7 +1584,7 @@ private void publishAndConsumeMessages(String inputTopic, } for (int i = 0; i < numMessages; i++) { - Message msg = consumer.receive(30, TimeUnit.SECONDS); + Message msg = consumer.receive(); String logMsg = new String(msg.getValue(), UTF_8); log.info("Received message: '{}'", logMsg); assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected"); From 6ebc0f6997425ab80330839ccaccc2eee2981d4f Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 14 Jun 2022 10:06:39 -0700 Subject: [PATCH 2/2] Reverted PulsarFunctionsTest consumer.receive() change --- .../pulsar/tests/integration/functions/PulsarFunctionsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 7a5c75b057ea1..9a6a26be3da4d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -1584,7 +1584,7 @@ private void publishAndConsumeMessages(String inputTopic, } for (int i = 0; i < numMessages; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(30, TimeUnit.SECONDS); String logMsg = new String(msg.getValue(), UTF_8); log.info("Received message: '{}'", logMsg); assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected");