From 0f9121415100d6fa16f6e2886d97fc882f9b35bb Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 May 2021 15:25:53 +0800 Subject: [PATCH 01/10] remove org.apache.pulsar.client.impl.conf.ClientConfigurationData --- pulsar-client-admin-api/pom.xml | 7 +------ .../java/org/apache/pulsar/client/admin/PulsarAdmin.java | 6 ------ .../pulsar/client/admin/internal/PulsarAdminImpl.java | 6 ------ .../java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java | 1 - 4 files changed, 1 insertion(+), 19 deletions(-) diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml index 505c234850dd1..b50f02a930322 100644 --- a/pulsar-client-admin-api/pom.xml +++ b/pulsar-client-admin-api/pom.xml @@ -33,15 +33,10 @@ Pulsar Client Admin :: API - - ${project.groupId} - pulsar-common - ${project.parent.version} - ${project.groupId} - pulsar-client-original + pulsar-common ${project.parent.version} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index ed5e13d7a1ea4..b6a1f45634d30 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -20,7 +20,6 @@ import java.io.Closeable; import org.apache.pulsar.client.admin.utils.DefaultImplementation; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -147,11 +146,6 @@ static PulsarAdminBuilder builder() { */ String getServiceUrl(); - /** - * @return the client Configuration Data that is being used - */ - ClientConfigurationData getClientConfigData(); - /** * @return the schemas */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index 8874902038984..d1d3266f8ba14 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -430,12 +430,6 @@ public String getServiceUrl() { return serviceUrl; } - /** - * @return the client Configuration Data that is being used - */ - public ClientConfigurationData getClientConfigData() { - return clientConfigData; - } /** * @return the schemas diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index e83e63108dbc6..e2cab00966b45 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -116,7 +116,6 @@ public void setup() throws Exception { this.functions = mock(Functions.class); when(admin.functions()).thenReturn(functions); when(admin.getServiceUrl()).thenReturn("http://localhost:1234"); - when(admin.getClientConfigData()).thenReturn(new ClientConfigurationData()); this.cmd = new CmdFunctions(() -> admin); this.cmdSinks = new CmdSinks(() -> admin); this.cmdSources = new CmdSources(() -> admin); From 01efe2e2b56a90a803ad4dc48b0376ce18988d5d Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 May 2021 16:05:13 +0800 Subject: [PATCH 02/10] use MessageId instead of MessageIdImpl --- .../pulsar/broker/admin/AdminApiOffloadTest.java | 2 +- .../pulsar/client/admin/OffloadProcessStatus.java | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 9bbb27c97f11d..735b4dc297d99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -135,7 +135,7 @@ private void testOffload(String topicName, String mlName) throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().offloadStatus(topicName).status, LongRunningProcessStatus.Status.SUCCESS)); - MessageIdImpl firstUnoffloaded = admin.topics().offloadStatus(topicName).firstUnoffloadedMessage; + MessageIdImpl firstUnoffloaded = (MessageIdImpl) admin.topics().offloadStatus(topicName).firstUnoffloadedMessage; // First unoffloaded is the first entry of current ledger assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId); assertEquals(firstUnoffloaded.getEntryId(), 0); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java index 99a2495bb3c32..f1dbafc9f7de5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java @@ -19,37 +19,36 @@ package org.apache.pulsar.client.admin; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; /** * Status of offload process. */ public class OffloadProcessStatus extends LongRunningProcessStatus { - public MessageIdImpl firstUnoffloadedMessage; + public MessageId firstUnoffloadedMessage; public OffloadProcessStatus() { super(Status.NOT_RUN, ""); - firstUnoffloadedMessage = (MessageIdImpl) MessageId.earliest; + firstUnoffloadedMessage = MessageId.earliest; } private OffloadProcessStatus(Status status, String lastError, - MessageIdImpl firstUnoffloadedMessage) { + MessageId firstUnoffloadedMessage) { this.status = status; this.lastError = lastError; this.firstUnoffloadedMessage = firstUnoffloadedMessage; } public static OffloadProcessStatus forStatus(Status status) { - return new OffloadProcessStatus(status, "", (MessageIdImpl) MessageId.earliest); + return new OffloadProcessStatus(status, "", MessageId.earliest); } public static OffloadProcessStatus forError(String lastError) { return new OffloadProcessStatus(Status.ERROR, lastError, - (MessageIdImpl) MessageId.earliest); + MessageId.earliest); } - public static OffloadProcessStatus forSuccess(MessageIdImpl messageId) { + public static OffloadProcessStatus forSuccess(MessageId messageId) { return new OffloadProcessStatus(Status.SUCCESS, "", messageId); } } From 0acd021d5315a4e097b403bfd2ac78b599ae164e Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 May 2021 16:25:23 +0800 Subject: [PATCH 03/10] get getClientConfigData back --- .../pulsar/client/admin/internal/PulsarAdminImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index d1d3266f8ba14..8874902038984 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -430,6 +430,12 @@ public String getServiceUrl() { return serviceUrl; } + /** + * @return the client Configuration Data that is being used + */ + public ClientConfigurationData getClientConfigData() { + return clientConfigData; + } /** * @return the schemas From 0d95b939dc31764429007b6457e9e588d5155c29 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 May 2021 16:55:23 +0800 Subject: [PATCH 04/10] fix io-kafka dependency --- pulsar-io/kafka/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml index 63f33874c00cf..62a5eb95f4cf3 100644 --- a/pulsar-io/kafka/pom.xml +++ b/pulsar-io/kafka/pom.xml @@ -39,6 +39,12 @@ provided + + ${project.groupId} + pulsar-client-original + ${project.version} + + com.fasterxml.jackson.core jackson-databind From 5080d06c07879b2ec0321a2c275372ab1fd522a2 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 8 May 2021 19:52:28 +0800 Subject: [PATCH 05/10] fix java-test-functions dependency --- tests/docker-images/java-test-functions/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml index 58dddff46c3a0..26b6726cf738f 100644 --- a/tests/docker-images/java-test-functions/pom.xml +++ b/tests/docker-images/java-test-functions/pom.xml @@ -34,6 +34,11 @@ pulsar-io-core ${project.version} + + org.apache.pulsar + pulsar-client-original + ${project.version} + jar From c4cae81f3b92de5bb7216d8b53496d023833055e Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 10 May 2021 10:48:17 +0800 Subject: [PATCH 06/10] add pulsar-client-original to pulsar-functions-runtime-all --- pulsar-functions/runtime-all/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml index aaf0677ba4c10..5eb35dba0ddca 100644 --- a/pulsar-functions/runtime-all/pom.xml +++ b/pulsar-functions/runtime-all/pom.xml @@ -48,7 +48,7 @@ ${project.groupId} - pulsar-client-api + pulsar-client-original ${project.version} From ac0de198142da3b1e5d9fd0449a70006a5db57c4 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 10 May 2021 17:21:35 +0800 Subject: [PATCH 07/10] fix offload tests --- .../pulsar/broker/admin/AdminApiOffloadTest.java | 11 ++++++++--- .../pulsar/client/admin/internal/TopicsImpl.java | 15 +++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 735b4dc297d99..2f1cea531273c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -28,6 +28,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + import com.google.common.collect.Sets; import java.util.HashMap; import java.util.Map; @@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -135,10 +138,12 @@ private void testOffload(String topicName, String mlName) throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().offloadStatus(topicName).status, LongRunningProcessStatus.Status.SUCCESS)); - MessageIdImpl firstUnoffloaded = (MessageIdImpl) admin.topics().offloadStatus(topicName).firstUnoffloadedMessage; + MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).firstUnoffloadedMessage; + assertTrue(firstUnoffloaded instanceof MessageIdImpl); + MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloaded; // First unoffloaded is the first entry of current ledger - assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId); - assertEquals(firstUnoffloaded.getEntryId(), 0); + assertEquals(firstUnoffloadedMessage.getLedgerId(), info.ledgers.get(1).ledgerId); + assertEquals(firstUnoffloadedMessage.getEntryId(), 0); verify(offloader, times(2)).offload(any(), any(), any()); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 1a2f77754adde..9761146e81587 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -19,9 +19,14 @@ package org.apache.pulsar.client.admin.internal; import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializer; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -85,6 +90,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1397,9 +1403,14 @@ public CompletableFuture offloadStatusAsync(String topic) WebTarget path = topicPath(tn, "offload"); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, - new InvocationCallback() { + new InvocationCallback() { @Override - public void completed(OffloadProcessStatus offloadProcessStatus) { + public void completed(String jsonString) { + Gson gson = new GsonBuilder().registerTypeAdapter(MessageId.class, + (JsonDeserializer) (json, typeOfT, context) + -> context.deserialize(json, MessageIdImpl.class)).create(); + OffloadProcessStatus offloadProcessStatus = gson.fromJson(jsonString, + OffloadProcessStatus.class); future.complete(offloadProcessStatus); } From ef5e2de5f7afab4d0748a0d652a3f8841ac74017 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 10 May 2021 17:28:48 +0800 Subject: [PATCH 08/10] fix import --- .../org/apache/pulsar/broker/admin/AdminApiOffloadTest.java | 2 -- .../org/apache/pulsar/client/admin/internal/TopicsImpl.java | 4 ---- 2 files changed, 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 2f1cea531273c..016d1da7afeae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -45,7 +44,6 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9761146e81587..88c3cef1a8efd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -19,13 +19,10 @@ package org.apache.pulsar.client.admin.internal; import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; import com.google.gson.JsonDeserializer; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; @@ -90,7 +87,6 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From cabd2d09794b9a364c978266842d532a88707f0e Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 11 May 2021 14:10:48 +0800 Subject: [PATCH 09/10] change Gson to Jackson for OffloadProcessStatus --- .../client/admin/internal/TopicsImpl.java | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 88c3cef1a8efd..c9227e85bdf62 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -19,11 +19,14 @@ package org.apache.pulsar.client.admin.internal; import static com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleAbstractTypeResolver; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializer; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -87,6 +90,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1402,12 +1406,28 @@ public CompletableFuture offloadStatusAsync(String topic) new InvocationCallback() { @Override public void completed(String jsonString) { - Gson gson = new GsonBuilder().registerTypeAdapter(MessageId.class, - (JsonDeserializer) (json, typeOfT, context) - -> context.deserialize(json, MessageIdImpl.class)).create(); - OffloadProcessStatus offloadProcessStatus = gson.fromJson(jsonString, - OffloadProcessStatus.class); - future.complete(offloadProcessStatus); + ObjectMapper mapper = ObjectMapperFactory.create(); + SimpleModule module = new SimpleModule("OffloadProcessStatusConvertModule", + Version.unknownVersion()); + + // we not specific @JsonDeserialize annotation in OffloadProcessStatus + // because we do not want to have jackson dependency in pulsar-client-admin-api + // In this case we use SimpleAbstractTypeResolver to map MessageId to MessageIdImpl + SimpleAbstractTypeResolver resolver = new SimpleAbstractTypeResolver(); + resolver.addMapping(MessageId.class, MessageIdImpl.class); + + module.setAbstractTypes(resolver); + mapper = mapper.registerModule(module); + OffloadProcessStatus offloadProcessStatus = null; + try { + offloadProcessStatus = mapper.readValue(jsonString, + OffloadProcessStatus.class); + } catch (JsonProcessingException e) { + future.completeExceptionally(getApiException(e)); + } + if (offloadProcessStatus != null) { + future.complete(offloadProcessStatus); + } } @Override From 4a7dc754f040af8136ae8ab4245a576f6b57c236 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 12 May 2021 12:47:50 +0800 Subject: [PATCH 10/10] move OffloadProcessStatus as interface, add InterfaceDefaultMapperModule for interface resolver # Conflicts: # pulsar-client-admin-api/pom.xml --- .../broker/admin/AdminApiOffloadTest.java | 12 ++-- pulsar-client-admin-api/pom.xml | 5 ++ .../client/admin/OffloadProcessStatus.java | 35 +++++------ .../admin/utils/DefaultImplementation.java | 15 +++++ .../admin/internal/JacksonConfigurator.java | 17 ++++++ .../internal/OffloadProcessStatusImpl.java | 59 +++++++++++++++++++ .../client/admin/internal/TopicsImpl.java | 33 +---------- .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +- .../apache/pulsar/admin/cli/CmdTopics.java | 9 +-- 9 files changed, 126 insertions(+), 62 deletions(-) create mode 100644 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 016d1da7afeae..a1099a9960590 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -105,12 +105,12 @@ private void testOffload(String topicName, String mlName) throws Exception { ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName); assertEquals(info.ledgers.size(), 2); - assertEquals(admin.topics().offloadStatus(topicName).status, + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.NOT_RUN); admin.topics().triggerOffload(topicName, currentId); - assertEquals(admin.topics().offloadStatus(topicName).status, + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.RUNNING); try { @@ -123,9 +123,9 @@ private void testOffload(String topicName, String mlName) throws Exception { // fail first time promise.completeExceptionally(new Exception("Some random failure")); - assertEquals(admin.topics().offloadStatus(topicName).status, + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.ERROR); - Assert.assertTrue(admin.topics().offloadStatus(topicName).lastError.contains("Some random failure")); + Assert.assertTrue(admin.topics().offloadStatus(topicName).getLastError().contains("Some random failure")); // Try again doReturn(CompletableFuture.completedFuture(null)) @@ -134,9 +134,9 @@ private void testOffload(String topicName, String mlName) throws Exception { admin.topics().triggerOffload(topicName, currentId); Awaitility.await().untilAsserted(() -> - assertEquals(admin.topics().offloadStatus(topicName).status, + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.SUCCESS)); - MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).firstUnoffloadedMessage; + MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).getFirstUnoffloadedMessage(); assertTrue(firstUnoffloaded instanceof MessageIdImpl); MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloaded; // First unoffloaded is the first entry of current ledger diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml index b50f02a930322..b875f2b06fb73 100644 --- a/pulsar-client-admin-api/pom.xml +++ b/pulsar-client-admin-api/pom.xml @@ -45,6 +45,11 @@ pulsar-package-core ${project.version} + + + com.google.code.gson + gson + diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java index f1dbafc9f7de5..9e070f8ba7cf1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java @@ -18,37 +18,30 @@ */ package org.apache.pulsar.client.admin; +import org.apache.pulsar.client.admin.utils.DefaultImplementation; import org.apache.pulsar.client.api.MessageId; /** - * Status of offload process. + * interface class of Status of offload process. */ -public class OffloadProcessStatus extends LongRunningProcessStatus { +public interface OffloadProcessStatus { - public MessageId firstUnoffloadedMessage; + MessageId getFirstUnoffloadedMessage(); + String getLastError(); + LongRunningProcessStatus.Status getStatus(); - public OffloadProcessStatus() { - super(Status.NOT_RUN, ""); - firstUnoffloadedMessage = MessageId.earliest; + static OffloadProcessStatus forStatus(LongRunningProcessStatus.Status status) { + return DefaultImplementation.newOffloadProcessStatus(status, "", MessageId.earliest); } - private OffloadProcessStatus(Status status, String lastError, - MessageId firstUnoffloadedMessage) { - this.status = status; - this.lastError = lastError; - this.firstUnoffloadedMessage = firstUnoffloadedMessage; - } - - public static OffloadProcessStatus forStatus(Status status) { - return new OffloadProcessStatus(status, "", MessageId.earliest); - } - - public static OffloadProcessStatus forError(String lastError) { - return new OffloadProcessStatus(Status.ERROR, lastError, + static OffloadProcessStatus forError(String lastError) { + return DefaultImplementation.newOffloadProcessStatus(LongRunningProcessStatus.Status.ERROR, lastError, MessageId.earliest); } - public static OffloadProcessStatus forSuccess(MessageId messageId) { - return new OffloadProcessStatus(Status.SUCCESS, "", messageId); + static OffloadProcessStatus forSuccess(MessageId messageId) { + return DefaultImplementation.newOffloadProcessStatus(LongRunningProcessStatus.Status.SUCCESS, "", + messageId); } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java index 1bab85b4165dd..149257f737c00 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java @@ -18,8 +18,12 @@ */ package org.apache.pulsar.client.admin.utils; +import java.lang.reflect.Constructor; import lombok.experimental.UtilityClass; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.MessageId; /** * Helper class for class instantiations and it also contains methods to work with schemas. @@ -30,7 +34,18 @@ public class DefaultImplementation { private static final Class ADMIN_CLIENT_BUILDER_IMPL = ReflectionUtils.newClassInstance( "org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl"); + private static final Constructor OFFLOAD_PROCESS_STATUS_IMPL_status_string_messageid = + ReflectionUtils.getConstructor( + "org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl", + LongRunningProcessStatus.Status.class, String.class, MessageId.class); + public static PulsarAdminBuilder newAdminClientBuilder() { return ReflectionUtils.catchExceptions(() -> ADMIN_CLIENT_BUILDER_IMPL.newInstance()); } + + public static OffloadProcessStatus newOffloadProcessStatus(LongRunningProcessStatus.Status status, String lastError + , MessageId messageId) { + return ReflectionUtils.catchExceptions(() -> OFFLOAD_PROCESS_STATUS_IMPL_status_string_messageid.newInstance( + status, lastError, messageId)); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java index 979e5b03fd335..ad9a5e61cf4bd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java @@ -19,8 +19,11 @@ package org.apache.pulsar.client.admin.internal; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleAbstractTypeResolver; +import com.fasterxml.jackson.databind.module.SimpleModule; import javax.ws.rs.ext.ContextResolver; import javax.ws.rs.ext.Provider; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.common.util.ObjectMapperFactory; /** @@ -33,6 +36,20 @@ public class JacksonConfigurator implements ContextResolver { public JacksonConfigurator() { mapper = ObjectMapperFactory.create(); + setInterfaceDefaultMapperModule(); + } + + private void setInterfaceDefaultMapperModule() { + SimpleModule module = new SimpleModule("InterfaceDefaultMapperModule"); + + // we not specific @JsonDeserialize annotation in OffloadProcessStatus + // because we do not want to have jackson dependency in pulsar-client-admin-api + // In this case we use SimpleAbstractTypeResolver to map interfaces to impls + SimpleAbstractTypeResolver resolver = new SimpleAbstractTypeResolver(); + resolver.addMapping(OffloadProcessStatus.class, OffloadProcessStatusImpl.class); + + module.setAbstractTypes(resolver); + mapper.registerModule(module); } @Override diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java new file mode 100644 index 0000000000000..ea63a33564322 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal; + +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; + +/** + * Status of offload process. + */ +public class OffloadProcessStatusImpl extends LongRunningProcessStatus implements OffloadProcessStatus { + public MessageIdImpl firstUnoffloadedMessage; + + public OffloadProcessStatusImpl() { + status = Status.NOT_RUN; + lastError = ""; + firstUnoffloadedMessage = (MessageIdImpl) MessageId.earliest; + } + + public OffloadProcessStatusImpl(Status status, String lastError, + MessageId firstUnoffloadedMessage) { + this.status = status; + this.lastError = lastError; + this.firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloadedMessage; + } + + @Override + public MessageId getFirstUnoffloadedMessage() { + return firstUnoffloadedMessage; + } + + @Override + public String getLastError() { + return lastError; + } + + @Override + public Status getStatus() { + return status; + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c9227e85bdf62..1a2f77754adde 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -19,11 +19,6 @@ package org.apache.pulsar.client.admin.internal; import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.Version; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleAbstractTypeResolver; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -90,7 +85,6 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1403,31 +1397,10 @@ public CompletableFuture offloadStatusAsync(String topic) WebTarget path = topicPath(tn, "offload"); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, - new InvocationCallback() { + new InvocationCallback() { @Override - public void completed(String jsonString) { - ObjectMapper mapper = ObjectMapperFactory.create(); - SimpleModule module = new SimpleModule("OffloadProcessStatusConvertModule", - Version.unknownVersion()); - - // we not specific @JsonDeserialize annotation in OffloadProcessStatus - // because we do not want to have jackson dependency in pulsar-client-admin-api - // In this case we use SimpleAbstractTypeResolver to map MessageId to MessageIdImpl - SimpleAbstractTypeResolver resolver = new SimpleAbstractTypeResolver(); - resolver.addMapping(MessageId.class, MessageIdImpl.class); - - module.setAbstractTypes(resolver); - mapper = mapper.registerModule(module); - OffloadProcessStatus offloadProcessStatus = null; - try { - offloadProcessStatus = mapper.readValue(jsonString, - OffloadProcessStatus.class); - } catch (JsonProcessingException e) { - future.completeExceptionally(getApiException(e)); - } - if (offloadProcessStatus != null) { - future.complete(offloadProcessStatus); - } + public void completed(OffloadProcessStatus offloadProcessStatus) { + future.complete(offloadProcessStatus); } @Override diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 2b1edd8d09596..cd816ebc3e278 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -54,6 +54,7 @@ import org.apache.pulsar.client.admin.Schemas; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.client.admin.Topics; +import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; @@ -1102,7 +1103,7 @@ public boolean matches(Long timestamp) { cmdTopics.run(split("offload persistent://myprop/clust/ns1/ds1 -s 1k")); verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", new MessageIdImpl(2, 0, -1)); - when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatus()); + when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatusImpl()); cmdTopics.run(split("offload-status persistent://myprop/clust/ns1/ds1")); verify(mockTopics).offloadStatus("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 92c39556f9ac0..54126cf48b09d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -45,6 +45,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; @@ -1022,13 +1023,13 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); try { - LongRunningProcessStatus status = getTopics().offloadStatus(persistentTopic); - while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) { + OffloadProcessStatus status = getTopics().offloadStatus(persistentTopic); + while (wait && status.getStatus() == LongRunningProcessStatus.Status.RUNNING) { Thread.sleep(1000); status = getTopics().offloadStatus(persistentTopic); } - switch (status.status) { + switch (status.getStatus()) { case NOT_RUN: System.out.println("Offload has not been run for " + persistentTopic + " since broker startup"); @@ -1041,7 +1042,7 @@ void run() throws PulsarAdminException { break; case ERROR: System.out.println("Error in offload"); - throw new PulsarAdminException("Error offloading: " + status.lastError); + throw new PulsarAdminException("Error offloading: " + status.getLastError()); } } catch (InterruptedException e) { throw new PulsarAdminException(e);