diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 480153db7c98a..c59685af23a4f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -28,12 +28,13 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -104,12 +105,12 @@ private void testOffload(String topicName, String mlName) throws Exception {
ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
assertEquals(info.ledgers.size(), 2);
- assertEquals(admin.topics().offloadStatus(topicName).status,
+ assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.NOT_RUN);
admin.topics().triggerOffload(topicName, currentId);
- assertEquals(admin.topics().offloadStatus(topicName).status,
+ assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.RUNNING);
try {
@@ -122,9 +123,9 @@ private void testOffload(String topicName, String mlName) throws Exception {
// fail first time
promise.completeExceptionally(new Exception("Some random failure"));
- assertEquals(admin.topics().offloadStatus(topicName).status,
+ assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.ERROR);
- Assert.assertTrue(admin.topics().offloadStatus(topicName).lastError.contains("Some random failure"));
+ Assert.assertTrue(admin.topics().offloadStatus(topicName).getLastError().contains("Some random failure"));
// Try again
doReturn(CompletableFuture.completedFuture(null))
@@ -133,12 +134,14 @@ private void testOffload(String topicName, String mlName) throws Exception {
admin.topics().triggerOffload(topicName, currentId);
Awaitility.await().untilAsserted(() ->
- assertEquals(admin.topics().offloadStatus(topicName).status,
+ assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.SUCCESS));
- MessageIdImpl firstUnoffloaded = admin.topics().offloadStatus(topicName).firstUnoffloadedMessage;
+ MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).getFirstUnoffloadedMessage();
+ assertTrue(firstUnoffloaded instanceof MessageIdImpl);
+ MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloaded;
// First unoffloaded is the first entry of current ledger
- assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId);
- assertEquals(firstUnoffloaded.getEntryId(), 0);
+ assertEquals(firstUnoffloadedMessage.getLedgerId(), info.ledgers.get(1).ledgerId);
+ assertEquals(firstUnoffloadedMessage.getEntryId(), 0);
verify(offloader, times(2)).offload(any(), any(), any());
}
diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml
index af2ed6f00232e..b875f2b06fb73 100644
--- a/pulsar-client-admin-api/pom.xml
+++ b/pulsar-client-admin-api/pom.xml
@@ -33,6 +33,7 @@
Pulsar Client Admin :: API
+
${project.groupId}
pulsar-common
@@ -41,10 +42,14 @@
${project.groupId}
- pulsar-client-original
- ${project.parent.version}
+ pulsar-package-core
+ ${project.version}
+
+ com.google.code.gson
+ gson
+
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
index 99a2495bb3c32..9e070f8ba7cf1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
@@ -18,38 +18,30 @@
*/
package org.apache.pulsar.client.admin;
+import org.apache.pulsar.client.admin.utils.DefaultImplementation;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
/**
- * Status of offload process.
+ * interface class of Status of offload process.
*/
-public class OffloadProcessStatus extends LongRunningProcessStatus {
+public interface OffloadProcessStatus {
- public MessageIdImpl firstUnoffloadedMessage;
+ MessageId getFirstUnoffloadedMessage();
+ String getLastError();
+ LongRunningProcessStatus.Status getStatus();
- public OffloadProcessStatus() {
- super(Status.NOT_RUN, "");
- firstUnoffloadedMessage = (MessageIdImpl) MessageId.earliest;
+ static OffloadProcessStatus forStatus(LongRunningProcessStatus.Status status) {
+ return DefaultImplementation.newOffloadProcessStatus(status, "", MessageId.earliest);
}
- private OffloadProcessStatus(Status status, String lastError,
- MessageIdImpl firstUnoffloadedMessage) {
- this.status = status;
- this.lastError = lastError;
- this.firstUnoffloadedMessage = firstUnoffloadedMessage;
+ static OffloadProcessStatus forError(String lastError) {
+ return DefaultImplementation.newOffloadProcessStatus(LongRunningProcessStatus.Status.ERROR, lastError,
+ MessageId.earliest);
}
- public static OffloadProcessStatus forStatus(Status status) {
- return new OffloadProcessStatus(status, "", (MessageIdImpl) MessageId.earliest);
+ static OffloadProcessStatus forSuccess(MessageId messageId) {
+ return DefaultImplementation.newOffloadProcessStatus(LongRunningProcessStatus.Status.SUCCESS, "",
+ messageId);
}
- public static OffloadProcessStatus forError(String lastError) {
- return new OffloadProcessStatus(Status.ERROR, lastError,
- (MessageIdImpl) MessageId.earliest);
- }
-
- public static OffloadProcessStatus forSuccess(MessageIdImpl messageId) {
- return new OffloadProcessStatus(Status.SUCCESS, "", messageId);
- }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index ed5e13d7a1ea4..b6a1f45634d30 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -20,7 +20,6 @@
import java.io.Closeable;
import org.apache.pulsar.client.admin.utils.DefaultImplementation;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -147,11 +146,6 @@ static PulsarAdminBuilder builder() {
*/
String getServiceUrl();
- /**
- * @return the client Configuration Data that is being used
- */
- ClientConfigurationData getClientConfigData();
-
/**
* @return the schemas
*/
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java
index 1bab85b4165dd..149257f737c00 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/utils/DefaultImplementation.java
@@ -18,8 +18,12 @@
*/
package org.apache.pulsar.client.admin.utils;
+import java.lang.reflect.Constructor;
import lombok.experimental.UtilityClass;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.MessageId;
/**
* Helper class for class instantiations and it also contains methods to work with schemas.
@@ -30,7 +34,18 @@ public class DefaultImplementation {
private static final Class ADMIN_CLIENT_BUILDER_IMPL = ReflectionUtils.newClassInstance(
"org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl");
+ private static final Constructor OFFLOAD_PROCESS_STATUS_IMPL_status_string_messageid =
+ ReflectionUtils.getConstructor(
+ "org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl",
+ LongRunningProcessStatus.Status.class, String.class, MessageId.class);
+
public static PulsarAdminBuilder newAdminClientBuilder() {
return ReflectionUtils.catchExceptions(() -> ADMIN_CLIENT_BUILDER_IMPL.newInstance());
}
+
+ public static OffloadProcessStatus newOffloadProcessStatus(LongRunningProcessStatus.Status status, String lastError
+ , MessageId messageId) {
+ return ReflectionUtils.catchExceptions(() -> OFFLOAD_PROCESS_STATUS_IMPL_status_string_messageid.newInstance(
+ status, lastError, messageId));
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java
index 979e5b03fd335..ad9a5e61cf4bd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/JacksonConfigurator.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.client.admin.internal;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleAbstractTypeResolver;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
@@ -33,6 +36,20 @@ public class JacksonConfigurator implements ContextResolver {
public JacksonConfigurator() {
mapper = ObjectMapperFactory.create();
+ setInterfaceDefaultMapperModule();
+ }
+
+ private void setInterfaceDefaultMapperModule() {
+ SimpleModule module = new SimpleModule("InterfaceDefaultMapperModule");
+
+ // we not specific @JsonDeserialize annotation in OffloadProcessStatus
+ // because we do not want to have jackson dependency in pulsar-client-admin-api
+ // In this case we use SimpleAbstractTypeResolver to map interfaces to impls
+ SimpleAbstractTypeResolver resolver = new SimpleAbstractTypeResolver();
+ resolver.addMapping(OffloadProcessStatus.class, OffloadProcessStatusImpl.class);
+
+ module.setAbstractTypes(resolver);
+ mapper.registerModule(module);
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java
new file mode 100644
index 0000000000000..ea63a33564322
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/OffloadProcessStatusImpl.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+/**
+ * Status of offload process.
+ */
+public class OffloadProcessStatusImpl extends LongRunningProcessStatus implements OffloadProcessStatus {
+ public MessageIdImpl firstUnoffloadedMessage;
+
+ public OffloadProcessStatusImpl() {
+ status = Status.NOT_RUN;
+ lastError = "";
+ firstUnoffloadedMessage = (MessageIdImpl) MessageId.earliest;
+ }
+
+ public OffloadProcessStatusImpl(Status status, String lastError,
+ MessageId firstUnoffloadedMessage) {
+ this.status = status;
+ this.lastError = lastError;
+ this.firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloadedMessage;
+ }
+
+ @Override
+ public MessageId getFirstUnoffloadedMessage() {
+ return firstUnoffloadedMessage;
+ }
+
+ @Override
+ public String getLastError() {
+ return lastError;
+ }
+
+ @Override
+ public Status getStatus() {
+ return status;
+ }
+}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e83e63108dbc6..e2cab00966b45 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -116,7 +116,6 @@ public void setup() throws Exception {
this.functions = mock(Functions.class);
when(admin.functions()).thenReturn(functions);
when(admin.getServiceUrl()).thenReturn("http://localhost:1234");
- when(admin.getClientConfigData()).thenReturn(new ClientConfigurationData());
this.cmd = new CmdFunctions(() -> admin);
this.cmdSinks = new CmdSinks(() -> admin);
this.cmdSources = new CmdSources(() -> admin);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a59609447de88..c571d9da4992b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -54,6 +54,7 @@
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -1113,7 +1114,7 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("offload persistent://myprop/clust/ns1/ds1 -s 1k"));
verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", new MessageIdImpl(2, 0, -1));
- when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatus());
+ when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatusImpl());
cmdTopics.run(split("offload-status persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).offloadStatus("persistent://myprop/clust/ns1/ds1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 75a8ba6079c7b..9bea9d0380e74 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -44,6 +44,7 @@
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
@@ -1035,13 +1036,13 @@ void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
- LongRunningProcessStatus status = getTopics().offloadStatus(persistentTopic);
- while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
+ OffloadProcessStatus status = getTopics().offloadStatus(persistentTopic);
+ while (wait && status.getStatus() == LongRunningProcessStatus.Status.RUNNING) {
Thread.sleep(1000);
status = getTopics().offloadStatus(persistentTopic);
}
- switch (status.status) {
+ switch (status.getStatus()) {
case NOT_RUN:
System.out.println("Offload has not been run for " + persistentTopic
+ " since broker startup");
@@ -1054,7 +1055,7 @@ void run() throws PulsarAdminException {
break;
case ERROR:
System.out.println("Error in offload");
- throw new PulsarAdminException("Error offloading: " + status.lastError);
+ throw new PulsarAdminException("Error offloading: " + status.getLastError());
}
} catch (InterruptedException e) {
throw new PulsarAdminException(e);
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index aaf0677ba4c10..5eb35dba0ddca 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -48,7 +48,7 @@
${project.groupId}
- pulsar-client-api
+ pulsar-client-original
${project.version}
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 63f33874c00cf..62a5eb95f4cf3 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -39,6 +39,12 @@
provided
+
+ ${project.groupId}
+ pulsar-client-original
+ ${project.version}
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index 58dddff46c3a0..26b6726cf738f 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -34,6 +34,11 @@
pulsar-io-core
${project.version}
+
+ org.apache.pulsar
+ pulsar-client-original
+ ${project.version}
+
jar