From c4fa10c550b379e52a0a0d4d57b429c255d17715 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 7 Nov 2022 13:19:36 +0800 Subject: [PATCH] [fix][test] Fix websocket.proxy test Signed-off-by: Zixuan Liu --- .../proxy/ProxyAuthenticationTest.java | 19 +++--------- .../ProxyEncryptionPublishConsumeTest.java | 29 +++++-------------- .../proxy/ProxyPublishConsumeTest.java | 25 +++++----------- .../proxy/ProxyPublishConsumeTlsTest.java | 17 ++--------- .../ProxyPublishConsumeWithoutZKTest.java | 18 ++---------- .../proxy/v1/V1_ProxyAuthenticationTest.java | 20 +++---------- 6 files changed, 28 insertions(+), 100 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 8e72688e458dc..b74f0d4ed6b8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.websocket.proxy; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -26,7 +25,6 @@ import com.google.common.collect.Sets; import java.net.URI; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; @@ -35,7 +33,6 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -92,20 +89,12 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void cleanup() throws Exception { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); try { - executor.submit(() -> { - try { - consumeClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); } catch (Exception e) { - log.error("failed to close clients ", e); + log.error(e.getMessage()); } super.internalCleanup(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java index fb5a04cb41277..e9d18f4d4014a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java @@ -18,24 +18,19 @@ */ package org.apache.pulsar.websocket.proxy; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -56,8 +51,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import lombok.Cleanup; - @Test(groups = "websocket") public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase { protected String methodName; @@ -220,22 +213,14 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe } private void stopWebSocketClient(WebSocketClient... clients) { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); - try { - executor.submit(() -> { - for (WebSocketClient client : clients) { - try { - client.stop(); - } catch (Exception e) { - log.error(e.getMessage()); - } - } - log.info("proxy clients are stopped successfully"); - }).get(2, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("failed to close proxy clients", e); + for (WebSocketClient client : clients) { + try { + client.stop(); + } catch (Exception e) { + log.error(e.getMessage()); + } } + log.info("proxy clients are stopped successfully"); } private static final Logger log = LoggerFactory.getLogger(ProxyEncryptionPublishConsumeTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index f8413244d0961..9a63f41e251bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.websocket.proxy; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -40,7 +39,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletResponse; @@ -50,7 +48,6 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; @@ -1040,22 +1037,14 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) { } private void stopWebSocketClient(WebSocketClient... clients) { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); - try { - executor.submit(() -> { - for (WebSocketClient client : clients) { - try { - client.stop(); - } catch (Exception e) { - log.error(e.getMessage()); - } - } - log.info("proxy clients are stopped successfully"); - }).get(2, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("failed to close proxy clients", e); + for (WebSocketClient client : clients) { + try { + client.stop(); + } catch (Exception e) { + log.error(e.getMessage()); + } } + log.info("proxy clients are stopped successfully"); } private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 835adeab2df42..468e97fcc0f10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.websocket.proxy; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -26,10 +25,8 @@ import java.net.URI; import java.security.GeneralSecurityException; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import lombok.Cleanup; import org.apache.pulsar.client.api.TlsProducerConsumerBase; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.util.SecurityUtility; @@ -134,18 +131,10 @@ public void socketTest() throws GeneralSecurityException { log.error(t.getMessage()); Assert.fail(t.getMessage()); } finally { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); try { - executor.submit(() -> { - try { - consumeClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); } catch (Exception e) { log.error("failed to close clients ", e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index e9dcba82eef57..5d4668cf7cd2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -18,17 +18,13 @@ */ package org.apache.pulsar.websocket.proxy; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import java.net.URI; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -113,18 +109,10 @@ public void socketTest() throws Exception { Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } finally { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); try { - executor.submit(() -> { - try { - consumeClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); } catch (Exception e) { log.error("failed to close clients ", e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index 5e38f2991c577..09a9719f141e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.websocket.proxy.v1; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -26,16 +25,13 @@ import com.google.common.collect.Sets; import java.net.URI; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import lombok.Cleanup; import org.apache.pulsar.client.api.v1.V1_ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -94,20 +90,12 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void cleanup() throws Exception { - @Cleanup("shutdownNow") - ExecutorService executor = newFixedThreadPool(1); try { - executor.submit(() -> { - try { - consumeClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); } catch (Exception e) { - log.error("failed to close clients ", e); + log.error(e.getMessage()); } super.internalCleanup();