Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ListOffsetsResult {

private final Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures;

ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {
public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {
this.futures = futures;
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public static class ListOffsetsResultInfo {
private final long timestamp;
private final Optional<Integer> leaderEpoch;

ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {
public ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {
this.offset = offset;
this.timestamp = timestamp;
this.leaderEpoch = leaderEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
*/
public class OffsetSpec {

static class EarliestSpec extends OffsetSpec { }
static class LatestSpec extends OffsetSpec { }
static class TimestampSpec extends OffsetSpec {
public static class EarliestSpec extends OffsetSpec { }
public static class LatestSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;

TimestampSpec(long timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class MockAdminClient extends AdminClient {
new HashMap<>();
private final Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaMoves =
new HashMap<>();
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final String clusterId;
private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs;
Expand Down Expand Up @@ -145,8 +147,11 @@ public MockAdminClient build() {
}
}

public MockAdminClient(List<Node> brokers,
Node controller) {
public MockAdminClient() {
this(Collections.singletonList(Node.noNode()), Node.noNode());
}

public MockAdminClient(List<Node> brokers, Node controller) {
this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS));
}
Expand All @@ -167,6 +172,8 @@ private MockAdminClient(List<Node> brokers,
for (int i = 0; i < brokers.size(); i++) {
this.brokerConfigs.add(new HashMap<>());
}
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
}

synchronized public void controller(Node controller) {
Expand Down Expand Up @@ -789,7 +796,24 @@ synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(St

@Override
synchronized public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
throw new UnsupportedOperationException("Not implement yet");
Map<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures = new HashMap<>();

for (Map.Entry<TopicPartition, OffsetSpec> entry : topicPartitionOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetSpec spec = entry.getValue();
KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo> future = new KafkaFutureImpl<>();

if (spec instanceof OffsetSpec.TimestampSpec)
throw new UnsupportedOperationException("Not implement yet");
else if (spec instanceof OffsetSpec.EarliestSpec)
future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty()));
else
future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty()));

futures.put(tp, future);
}

return new ListOffsetsResult(futures);
}

@Override
Expand All @@ -805,6 +829,14 @@ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteratio
@Override
synchronized public void close(Duration timeout) {}

public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
beginningOffsets.putAll(newOffsets);
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
Comment thread
mjsax marked this conversation as resolved.
endOffsets.putAll(newOffsets);
}

private final static class TopicMetadata {
final boolean isInternalTopic;
final List<TopicPartitionInfo> partitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* See {@link StoreChangelogReader}.
*/
interface ChangelogRegister {
public interface ChangelogRegister {
/**
* Register a state store for restoration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
Expand All @@ -43,6 +48,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
Expand Down Expand Up @@ -199,6 +206,9 @@ int bufferedLimitIndex() {
// to update offset limit for standby tasks;
private Consumer<byte[], byte[]> mainConsumer;

// the changelog reader needs the admin client to list end offsets
private final Admin adminClient;

private long lastUpdateOffsetTime;

void setMainConsumer(final Consumer<byte[], byte[]> consumer) {
Expand All @@ -208,11 +218,13 @@ void setMainConsumer(final Consumer<byte[], byte[]> consumer) {
public StoreChangelogReader(final Time time,
final StreamsConfig config,
final LogContext logContext,
final Admin adminClient,
final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener stateRestoreListener) {
this.time = time;
this.log = logContext.logger(StoreChangelogReader.class);
this.state = ChangelogReaderState.ACTIVE_RESTORING;
this.adminClient = adminClient;
this.restoreConsumer = restoreConsumer;
this.stateRestoreListener = stateRestoreListener;

Expand Down Expand Up @@ -564,8 +576,13 @@ private Map<TopicPartition, Long> endOffsetForChangelogs(final Set<TopicPartitio
return Collections.emptyMap();

try {
return restoreConsumer.endOffsets(partitions);
} catch (final TimeoutException e) {
final ListOffsetsResult result = adminClient.listOffsets(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check out the new methods in ClientUtils?

partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())),
new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED)
);
return result.all().get().entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));
} catch (final TimeoutException | InterruptedException | ExecutionException e) {
// if timeout exception gets thrown we just give up this time and retry in the next run loop
log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions);
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,12 @@ public boolean isRunning() {
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;

private final Admin adminClient;
private final ChangelogReader changelogReader;

// package-private for testing
final ConsumerRebalanceListener rebalanceListener;
final Consumer<byte[], byte[]> mainConsumer;
final Consumer<byte[], byte[]> restoreConsumer;
final InternalTopologyBuilder builder;
private final ConsumerRebalanceListener rebalanceListener;
private final Consumer<byte[], byte[]> mainConsumer;
private final Consumer<byte[], byte[]> restoreConsumer;
private final Admin adminClient;
private final InternalTopologyBuilder builder;

public static StreamThread create(final InternalTopologyBuilder builder,
final StreamsConfig config,
Expand Down Expand Up @@ -309,6 +307,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
time,
config,
logContext,
adminClient,
restoreConsumer,
userStateRestoreListener
);
Expand Down Expand Up @@ -1020,4 +1019,23 @@ int currentNumIterations() {
return numIterations;
}

ConsumerRebalanceListener rebalanceListener() {
return rebalanceListener;
}

Consumer<byte[], byte[]> mainConsumer() {
return mainConsumer;
}

Consumer<byte[], byte[]> restoreConsumer() {
return restoreConsumer;
};

Admin adminClient() {
return adminClient;
}

InternalTopologyBuilder internalTopologyBuilder() {
return builder;
};
}
Loading