From e09ea7503ca6f8d7532c89ebe73943777652a6d2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 Jun 2018 17:55:42 -0700 Subject: [PATCH 1/2] Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner --- ...ementalPublishingKafkaIndexTaskRunner.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index df2f768662d5..77b4676e3192 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -103,6 +103,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1504,7 +1505,7 @@ private static class SequenceMetadata private final int sequenceId; private final String sequenceName; private final Map startOffsets; - private final Map endOffsets; + private final ConcurrentHashMap endOffsets; private final Set assignments; private final boolean sentinel; private volatile boolean checkpointed; @@ -1524,8 +1525,8 @@ public SequenceMetadata( this.sequenceId = sequenceId; this.sequenceName = sequenceName; this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = Maps.newHashMap(endOffsets); - this.assignments = Sets.newHashSet(startOffsets.keySet()); + this.endOffsets = new ConcurrentHashMap<>(endOffsets); + this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; } @@ -1575,10 +1576,9 @@ void setEndOffsets(Map newEndOffsets) void updateAssignments(Map nextPartitionOffset) { assignments.clear(); - nextPartitionOffset.entrySet().forEach(partitionOffset -> { - if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) - > 0) { - assignments.add(partitionOffset.getKey()); + nextPartitionOffset.forEach((key, value) -> { + if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) { + assignments.add(key); } }); } @@ -1637,7 +1637,8 @@ public Object getMetadata() { Preconditions.checkState( assignments.isEmpty(), - "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", + "This committer can be used only once all the records till offsets [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", endOffsets ); @@ -1646,16 +1647,20 @@ public Object getMetadata() // corresponding to the current sequence. Generally, lastPersistedOffsets should already // cover endOffsets but just to be sure take max of offsets and persist that for (Map.Entry partitionOffset : endOffsets.entrySet()) { - lastPersistedOffsets.put(partitionOffset.getKey(), Math.max( - partitionOffset.getValue(), - lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) - )); + lastPersistedOffsets.put( + partitionOffset.getKey(), + Math.max( + partitionOffset.getValue(), + lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) + ) + ); } // Publish metadata can be different from persist metadata as we are going to publish only // subset of segments - return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), + METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) ); } From 2437e263f1938dc0e2bc179ddbd56affb602a683 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 28 Jun 2018 14:20:34 -0700 Subject: [PATCH 2/2] fix lock and add comments --- ...ementalPublishingKafkaIndexTaskRunner.java | 151 ++++++++++++------ 1 file changed, 101 insertions(+), 50 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 77b4676e3192..3caeedf905d9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -103,6 +103,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1502,13 +1503,19 @@ public DateTime getStartTime(@Context final HttpServletRequest req) private static class SequenceMetadata { + /** + * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because + * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. + */ + private final ReentrantLock lock = new ReentrantLock(); + private final int sequenceId; private final String sequenceName; private final Map startOffsets; - private final ConcurrentHashMap endOffsets; + private final Map endOffsets; private final Set assignments; private final boolean sentinel; - private volatile boolean checkpointed; + private boolean checkpointed; @JsonCreator public SequenceMetadata( @@ -1525,7 +1532,7 @@ public SequenceMetadata( this.sequenceId = sequenceId; this.sequenceName = sequenceName; this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = new ConcurrentHashMap<>(endOffsets); + this.endOffsets = new HashMap<>(endOffsets); this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; @@ -1540,7 +1547,13 @@ public int getSequenceId() @JsonProperty public boolean isCheckpointed() { - return checkpointed; + lock.lock(); + try { + return checkpointed; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -1558,7 +1571,13 @@ public Map getStartOffsets() @JsonProperty public Map getEndOffsets() { - return endOffsets; + lock.lock(); + try { + return endOffsets; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -1569,18 +1588,30 @@ public boolean isSentinel() void setEndOffsets(Map newEndOffsets) { - endOffsets.putAll(newEndOffsets); - checkpointed = true; + lock.lock(); + try { + endOffsets.putAll(newEndOffsets); + checkpointed = true; + } + finally { + lock.unlock(); + } } void updateAssignments(Map nextPartitionOffset) { - assignments.clear(); - nextPartitionOffset.forEach((key, value) -> { - if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) { - assignments.add(key); - } - }); + lock.lock(); + try { + assignments.clear(); + nextPartitionOffset.forEach((key, value) -> { + if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) { + assignments.add(key); + } + }); + } + finally { + lock.unlock(); + } } boolean isOpen() @@ -1590,10 +1621,17 @@ boolean isOpen() boolean canHandle(ConsumerRecord record) { - return isOpen() - && endOffsets.get(record.partition()) != null - && record.offset() >= startOffsets.get(record.partition()) - && record.offset() < endOffsets.get(record.partition()); + lock.lock(); + try { + final Long partitionEndOffset = endOffsets.get(record.partition()); + return isOpen() + && partitionEndOffset != null + && record.offset() >= startOffsets.get(record.partition()) + && record.offset() < partitionEndOffset; + } + finally { + lock.unlock(); + } } private SequenceMetadata() @@ -1615,15 +1653,21 @@ static SequenceMetadata getSentinelSequenceMetadata() @Override public String toString() { - return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + - ", assignments=" + assignments + - ", sentinel=" + sentinel + - ", checkpointed=" + checkpointed + - '}'; + lock.lock(); + try { + return "SequenceMetadata{" + + "sequenceName='" + sequenceName + '\'' + + ", sequenceId=" + sequenceId + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", assignments=" + assignments + + ", sentinel=" + sentinel + + ", checkpointed=" + checkpointed + + '}'; + } + finally { + lock.unlock(); + } } Supplier getCommitterSupplier(String topic, Map lastPersistedOffsets) @@ -1635,33 +1679,40 @@ Supplier getCommitterSupplier(String topic, Map lastPe @Override public Object getMetadata() { - Preconditions.checkState( - assignments.isEmpty(), - "This committer can be used only once all the records till offsets [%s] have been consumed, also make" - + " sure to call updateAssignments before using this committer", - endOffsets - ); + lock.lock(); - // merge endOffsets for this sequence with globally lastPersistedOffsets - // This is done because this committer would be persisting only sub set of segments - // corresponding to the current sequence. Generally, lastPersistedOffsets should already - // cover endOffsets but just to be sure take max of offsets and persist that - for (Map.Entry partitionOffset : endOffsets.entrySet()) { - lastPersistedOffsets.put( - partitionOffset.getKey(), - Math.max( - partitionOffset.getValue(), - lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) - ) + try { + Preconditions.checkState( + assignments.isEmpty(), + "This committer can be used only once all the records till offsets [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", + endOffsets ); - } - // Publish metadata can be different from persist metadata as we are going to publish only - // subset of segments - return ImmutableMap.of( - METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) - ); + // merge endOffsets for this sequence with globally lastPersistedOffsets + // This is done because this committer would be persisting only sub set of segments + // corresponding to the current sequence. Generally, lastPersistedOffsets should already + // cover endOffsets but just to be sure take max of offsets and persist that + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + lastPersistedOffsets.put( + partitionOffset.getKey(), + Math.max( + partitionOffset.getValue(), + lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) + ) + ); + } + + // Publish metadata can be different from persist metadata as we are going to publish only + // subset of segments + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), + METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) + ); + } + finally { + lock.unlock(); + } } @Override