Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
Expand Down Expand Up @@ -72,6 +71,7 @@
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
Expand All @@ -85,9 +85,9 @@
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -2059,13 +2059,19 @@ private boolean withinMinMaxRecordTime(final InputRow row)

private static class SequenceMetadata
{
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
* {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
*/
private final ReentrantLock lock = new ReentrantLock();

private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
private volatile boolean checkpointed;
private boolean checkpointed;

@JsonCreator
public SequenceMetadata(
Expand All @@ -2082,8 +2088,8 @@ public SequenceMetadata(
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
this.endOffsets = Maps.newHashMap(endOffsets);
this.assignments = Sets.newHashSet(startOffsets.keySet());
this.endOffsets = new HashMap<>(endOffsets);
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
Expand All @@ -2097,7 +2103,13 @@ public int getSequenceId()
@JsonProperty
public boolean isCheckpointed()
{
return checkpointed;
lock.lock();
try {
return checkpointed;
}
finally {
lock.unlock();
}
}

@JsonProperty
Expand All @@ -2115,7 +2127,13 @@ public Map<Integer, Long> getStartOffsets()
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
return endOffsets;
lock.lock();
try {
return endOffsets;
}
finally {
lock.unlock();
}
}

@JsonProperty
Expand All @@ -2126,19 +2144,31 @@ public boolean isSentinel()

public void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
endOffsets.putAll(newEndOffsets);
checkpointed = true;
lock.lock();
try {
endOffsets.putAll(newEndOffsets);
checkpointed = true;
}
finally {
lock.unlock();
}
}

public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
assignments.clear();
nextPartitionOffset.entrySet().forEach(partitionOffset -> {
if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
> 0) {
assignments.add(partitionOffset.getKey());
}
});
lock.lock();
try {
assignments.clear();
nextPartitionOffset.entrySet().forEach(partitionOffset -> {
if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
> 0) {
assignments.add(partitionOffset.getKey());
}
});
}
finally {
lock.unlock();
}
}

public boolean isOpen()
Expand All @@ -2148,10 +2178,16 @@ public boolean isOpen()

boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
return isOpen()
&& endOffsets.get(record.partition()) != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < endOffsets.get(record.partition());
lock.lock();
try {
return isOpen()
&& endOffsets.get(record.partition()) != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < endOffsets.get(record.partition());
}
finally {
lock.unlock();
}
}

private SequenceMetadata()
Expand All @@ -2173,15 +2209,21 @@ public static SequenceMetadata getSentinelSequenceMetadata()
@Override
public String toString()
{
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
}
finally {
lock.unlock();
}
}


Expand All @@ -2194,28 +2236,40 @@ public Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long>
@Override
public Object getMetadata()
{
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer",
endOffsets
);
lock.lock();

// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
));
}
try {
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make"
+ " sure to call updateAssignments before using this committer",
endOffsets
);

// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(
partitionOffset.getKey(),
Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
)
);
}

// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
}
finally {
lock.unlock();
}
}

@Override
Expand Down