diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d7bffedea17b..62aca366e485 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -44,7 +44,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; @@ -72,6 +71,7 @@ import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.DruidMetrics; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; @@ -85,9 +85,9 @@ import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; -import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -2059,13 +2059,19 @@ private boolean withinMinMaxRecordTime(final InputRow row) private static class SequenceMetadata { + /** + * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because + * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. + */ + private final ReentrantLock lock = new ReentrantLock(); + private final int sequenceId; private final String sequenceName; private final Map startOffsets; private final Map endOffsets; private final Set assignments; private final boolean sentinel; - private volatile boolean checkpointed; + private boolean checkpointed; @JsonCreator public SequenceMetadata( @@ -2082,8 +2088,8 @@ public SequenceMetadata( this.sequenceId = sequenceId; this.sequenceName = sequenceName; this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = Maps.newHashMap(endOffsets); - this.assignments = Sets.newHashSet(startOffsets.keySet()); + this.endOffsets = new HashMap<>(endOffsets); + this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; } @@ -2097,7 +2103,13 @@ public int getSequenceId() @JsonProperty public boolean isCheckpointed() { - return checkpointed; + lock.lock(); + try { + return checkpointed; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -2115,7 +2127,13 @@ public Map getStartOffsets() @JsonProperty public Map getEndOffsets() { - return endOffsets; + lock.lock(); + try { + return endOffsets; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -2126,19 +2144,31 @@ public boolean isSentinel() public void setEndOffsets(Map newEndOffsets) { - endOffsets.putAll(newEndOffsets); - checkpointed = true; + lock.lock(); + try { + endOffsets.putAll(newEndOffsets); + checkpointed = true; + } + finally { + lock.unlock(); + } } public void updateAssignments(Map nextPartitionOffset) { - assignments.clear(); - nextPartitionOffset.entrySet().forEach(partitionOffset -> { - if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) - > 0) { - assignments.add(partitionOffset.getKey()); - } - }); + lock.lock(); + try { + assignments.clear(); + nextPartitionOffset.entrySet().forEach(partitionOffset -> { + if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) + > 0) { + assignments.add(partitionOffset.getKey()); + } + }); + } + finally { + lock.unlock(); + } } public boolean isOpen() @@ -2148,10 +2178,16 @@ public boolean isOpen() boolean canHandle(ConsumerRecord record) { - return isOpen() - && endOffsets.get(record.partition()) != null - && record.offset() >= startOffsets.get(record.partition()) - && record.offset() < endOffsets.get(record.partition()); + lock.lock(); + try { + return isOpen() + && endOffsets.get(record.partition()) != null + && record.offset() >= startOffsets.get(record.partition()) + && record.offset() < endOffsets.get(record.partition()); + } + finally { + lock.unlock(); + } } private SequenceMetadata() @@ -2173,15 +2209,21 @@ public static SequenceMetadata getSentinelSequenceMetadata() @Override public String toString() { - return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + - ", assignments=" + assignments + - ", sentinel=" + sentinel + - ", checkpointed=" + checkpointed + - '}'; + lock.lock(); + try { + return "SequenceMetadata{" + + "sequenceName='" + sequenceName + '\'' + + ", sequenceId=" + sequenceId + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", assignments=" + assignments + + ", sentinel=" + sentinel + + ", checkpointed=" + checkpointed + + '}'; + } + finally { + lock.unlock(); + } } @@ -2194,28 +2236,40 @@ public Supplier getCommitterSupplier(String topic, Map @Override public Object getMetadata() { - Preconditions.checkState( - assignments.isEmpty(), - "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", - endOffsets - ); + lock.lock(); - // merge endOffsets for this sequence with globally lastPersistedOffsets - // This is done because this committer would be persisting only sub set of segments - // corresponding to the current sequence. Generally, lastPersistedOffsets should already - // cover endOffsets but just to be sure take max of offsets and persist that - for (Map.Entry partitionOffset : endOffsets.entrySet()) { - lastPersistedOffsets.put(partitionOffset.getKey(), Math.max( - partitionOffset.getValue(), - lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) - )); - } + try { + Preconditions.checkState( + assignments.isEmpty(), + "This committer can be used only once all the records till offsets [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", + endOffsets + ); - // Publish metadata can be different from persist metadata as we are going to publish only - // subset of segments - return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) - ); + // merge endOffsets for this sequence with globally lastPersistedOffsets + // This is done because this committer would be persisting only sub set of segments + // corresponding to the current sequence. Generally, lastPersistedOffsets should already + // cover endOffsets but just to be sure take max of offsets and persist that + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + lastPersistedOffsets.put( + partitionOffset.getKey(), + Math.max( + partitionOffset.getValue(), + lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) + ) + ); + } + + // Publish metadata can be different from persist metadata as we are going to publish only + // subset of segments + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), + METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) + ); + } + finally { + lock.unlock(); + } } @Override