Skip to content

Commit 3294210

Browse files
committed
[fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry
1 parent bc44280 commit 3294210

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.concurrent.CountDownLatch;
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.TimeoutException;
40+
3941
import lombok.Cleanup;
4042
import lombok.extern.slf4j.Slf4j;
4143
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -48,6 +50,7 @@
4850
import org.apache.pulsar.client.api.MessageRoutingMode;
4951
import org.apache.pulsar.client.api.Producer;
5052
import org.apache.pulsar.client.api.ProducerBuilder;
53+
import org.apache.pulsar.client.api.PulsarClient;
5154
import org.apache.pulsar.client.api.PulsarClientException;
5255
import org.apache.pulsar.client.api.Range;
5356
import org.apache.pulsar.client.api.Reader;
@@ -902,4 +905,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
902905
assertTrue(reader.hasMessageAvailable());
903906
}
904907
}
908+
909+
@Test
910+
public void testReaderBuilderStateOnRetryFailure() throws Exception {
911+
String ns = "my-property/my-ns";
912+
String topic = "persistent://" + ns + "/testRetryReader";
913+
RetentionPolicies retention = new RetentionPolicies(-1, -1);
914+
admin.namespaces().setRetention(ns, retention);
915+
String badUrl = "pulsar://bad-host:8080";
916+
917+
PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build();
918+
919+
ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100,
920+
TimeUnit.SECONDS);
921+
922+
for (int i = 0; i < 3; i++) {
923+
try {
924+
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
925+
} catch (TimeoutException e) {
926+
log.info("It should time out due to invalid url");
927+
} catch (IllegalStateException e) {
928+
fail("It should not fail with corrupted reader state");
929+
}
930+
}
931+
}
905932
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public CompletableFuture<Reader<T>> createAsync() {
8686
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
8787
}
8888

89-
if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0
89+
boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest;
90+
if (isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0
9091
|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
9192
return FutureUtil
9293
.failedFuture(new IllegalArgumentException(

pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void readerBuilderLoadConfTest() throws Exception {
106106
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
107107
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
108108
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
109-
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest)
109+
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest)
110110
.startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
111111
// no-op
112112
} finally {

0 commit comments

Comments
 (0)