diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java index e5d2bf9276e40..96055cff72e2d 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java @@ -30,10 +30,9 @@ import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension; import org.apache.flink.util.TestLoggerExtension; -import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -61,7 +60,6 @@ import static java.util.Collections.singletonList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; -import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_DEFAULT_FETCH_TIME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS; @@ -135,14 +133,31 @@ private void seekStartPositionAndHandleSplit( new PulsarPartitionSplit(partition, StopCursor.never(), null, null); SplitsAddition addition = new SplitsAddition<>(singletonList(split)); - // create consumer and seek before split changes - try (Consumer consumer = reader.createPulsarConsumer(partition)) { - // inclusive messageId - consumer.seek(startPosition); - } catch (PulsarClientException e) { - sneakyThrow(e); + // Create the subscription and set the start position for this reader. + // Remember not to use Consumer.seek(startPosition) + SourceConfiguration sourceConfiguration = reader.sourceConfiguration; + PulsarAdmin pulsarAdmin = reader.pulsarAdmin; + String subscriptionName = sourceConfiguration.getSubscriptionName(); + List subscriptions = + sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName)); + if (!subscriptions.contains(subscriptionName)) { + // If this subscription is not available. Just create it. + sneakyAdmin( + () -> + pulsarAdmin + .topics() + .createSubscription( + topicName, subscriptionName, startPosition)); + } else { + // Reset the subscription if this is existed. + sneakyAdmin( + () -> + pulsarAdmin + .topics() + .resetCursor(topicName, subscriptionName, startPosition)); } + // Accept the split and start consuming. reader.handleSplitsChanges(addition); } @@ -199,7 +214,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader) String topicName = randomAlphabetic(10); // Add a split - seekStartPositionAndHandleSplit(splitReader, topicName, 0); + handleSplit(splitReader, topicName, 0); // Poll once with a null message Message message1 = fetchedMessage(splitReader); @@ -223,7 +238,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader) void consumeMessageCreatedAfterHandleSplitChangesAndFetch( PulsarPartitionSplitReaderBase splitReader) { String topicName = randomAlphabetic(10); - seekStartPositionAndHandleSplit(splitReader, topicName, 0); + handleSplit(splitReader, topicName, 0); operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, randomAlphabetic(10)); fetchedMessages(splitReader, 1, true); }