diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java index d0e26b086acb..6a1e6c0cbef2 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java @@ -54,5 +54,7 @@ byte[] lookup( void createSupervisorsTable(); + void createTaskCheckPointsTable(); + void deleteAllRecords(String tableName); } diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java index af8054ff67bf..6b6202811e02 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java @@ -31,7 +31,7 @@ public class MetadataStorageTablesConfig { public static MetadataStorageTablesConfig fromBase(String base) { - return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null, null); + return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null, null, null); } public static final String TASK_ENTRY_TYPE = "task"; @@ -75,6 +75,9 @@ public static MetadataStorageTablesConfig fromBase(String base) @JsonProperty("supervisors") private final String supervisorTable; + @JsonProperty("taskCheckPoints") + private final String taskCheckPointsTable; + @JsonCreator public MetadataStorageTablesConfig( @JsonProperty("base") String base, @@ -87,7 +90,8 @@ public MetadataStorageTablesConfig( @JsonProperty("taskLog") String taskLogTable, @JsonProperty("taskLock") String taskLockTable, @JsonProperty("audit") String auditTable, - @JsonProperty("supervisors") String supervisorTable + @JsonProperty("supervisors") String supervisorTable, + @JsonProperty("taskCheckPoints") String taskCheckPointsTable ) { this.base = (base == null) ? DEFAULT_BASE : base; @@ -105,6 +109,7 @@ public MetadataStorageTablesConfig( lockTables.put(TASK_ENTRY_TYPE, this.taskLockTable); this.auditTable = makeTableName(auditTable, "audit"); this.supervisorTable = makeTableName(supervisorTable, "supervisors"); + this.taskCheckPointsTable = makeTableName(taskCheckPointsTable, "taskCheckPoints"); } private String makeTableName(String explicitTableName, String defaultSuffix) @@ -193,4 +198,9 @@ public String getTaskLockTable() { return taskLockTable; } + + public String getTaskCheckPointsTable() + { + return taskCheckPointsTable; + } } diff --git a/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java b/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java index 7c90fd8440b4..1e97cf582108 100644 --- a/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java +++ b/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java @@ -18,15 +18,13 @@ */ package io.druid.metadata.storage.sqlserver; -import java.sql.SQLException; - -import org.junit.Assert; -import org.junit.Test; - import com.google.common.base.Suppliers; - import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.SQLException; @SuppressWarnings("nls") public class SQLServerConnectorTest @@ -49,6 +47,7 @@ public void testIsTransientException() throws Exception null, null, null, + null, null ) ) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/DriverHolder.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/DriverHolder.java new file mode 100644 index 000000000000..bd6233255ca8 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/DriverHolder.java @@ -0,0 +1,690 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.timeline.DataSegment; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; + +public class DriverHolder implements Closeable +{ + public enum DriverStatus + { + NOT_OPEN, + OPEN, + COMPLETE, + PERSISTING, + PERSISTED, + PUBLISHING, + PUBLISHED, + CLOSED + } + + volatile DriverStatus driverStatus = DriverStatus.NOT_OPEN; + + private static final EmittingLogger log = new EmittingLogger(DriverHolder.class); + + private final FiniteAppenderatorDriver driver; + private final Set assignment; + private Supplier committerSupplier; + + private final Map nextPartitionOffsets = Maps.newHashMap(); + + /*********** Driver Metadata **********/ + private final String topic; + private final Map startOffsets; + private final Map endOffsets; + private final String sequenceName; + // this uniquely identifies the persist dir for this driver + private final int driverIndex; + // whether this is the last driver for the task + private volatile boolean last; + // whether the end offsets for this driver is set + private volatile boolean checkPointed; + private boolean maxRowsPerSegmentLimitReached; + + /***************************************/ + + private DriverHolder( + String topic, + FiniteAppenderatorDriver driver, + Map startOffsets, + Map endOffsets, + int driverIndex, + String sequenceName, + boolean last, + boolean checkPointed, + boolean maxRowsPerSegmentLimitReached + ) + { + this.topic = topic; + this.driver = driver; + this.startOffsets = startOffsets; + // endOffsets will change when a check point is set so make a local copy of it + this.endOffsets = Maps.newHashMap(endOffsets); + this.driverIndex = driverIndex; + this.sequenceName = sequenceName; + this.assignment = Sets.newHashSet(endOffsets.keySet()); + this.last = last; + this.checkPointed = checkPointed; + this.maxRowsPerSegmentLimitReached = maxRowsPerSegmentLimitReached; + } + + Map startJob(final ObjectMapper mapper) + { + final Object restored = driver.startJob(); + Preconditions.checkState( + driverStatus == DriverStatus.NOT_OPEN, + "WTH?! Cannot change driver status to [%s] from [%s]", + DriverStatus.OPEN, + driverStatus + ); + driverStatus = DriverStatus.OPEN; + if (restored == null) { + nextPartitionOffsets.putAll(startOffsets); + } else { + Map restoredMetadataMap = (Map) restored; + final KafkaPartitions restoredNextPartitions = mapper.convertValue( + restoredMetadataMap.get(KafkaIndexTask.METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); + nextPartitionOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap()); + + // Sanity checks. + if (!restoredNextPartitions.getTopic().equals(topic)) { + throw new ISE( + "WTF?! Restored topic[%s] but expected topic[%s]", + restoredNextPartitions.getTopic(), + topic + ); + } + + if (!nextPartitionOffsets.keySet().equals(startOffsets.keySet())) { + throw new ISE( + "WTF?! Restored partitions[%s] but expected partitions[%s]", + nextPartitionOffsets.keySet(), + startOffsets.keySet() + ); + } + } + committerSupplier = new Supplier() + { + @Override + public Committer get() + { + final Map snapshot = ImmutableMap.copyOf(nextPartitionOffsets); + + return new Committer() + { + @Override + public Object getMetadata() + { + return ImmutableMap.of( + KafkaIndexTask.METADATA_NEXT_PARTITIONS, new KafkaPartitions( + topic, + snapshot + ) + ); + } + + @Override + public void run() + { + // Do nothing + } + }; + } + }; + + // remove partition for which offsets have been consumed fully + for (Map.Entry partitionOffset : nextPartitionOffsets.entrySet()) { + if (endOffsets.get(partitionOffset.getKey()).equals(partitionOffset.getValue())) { + logInfo("[Start-Job] read partition [%d]", partitionOffset.getKey()); + assignment.remove(partitionOffset.getKey()); + } + } + if (assignment.isEmpty()) { + // finish the driver + logInfo("[Start-Job] read all partitions"); + Preconditions.checkState( + driverStatus == DriverStatus.OPEN, + "WTH?! Cannot change driver status to [%s] from [%s]", + DriverStatus.COMPLETE, + driverStatus + ); + driverStatus = DriverStatus.COMPLETE; + } + + return nextPartitionOffsets; + } + + boolean canHandle(ConsumerRecord record) + { + return driverStatus == DriverStatus.OPEN + && endOffsets.get(record.partition()) != null + && record.offset() >= startOffsets.get(record.partition()) + && record.offset() < endOffsets.get(record.partition()); + } + + public SegmentIdentifier add(InputRow row) throws IOException + { + Preconditions.checkState(driverStatus == DriverStatus.OPEN, "Cannot add to driver which is not open!"); + + SegmentIdentifier identifier = driver.add(row, sequenceName, committerSupplier, false); + + if (identifier == null) { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + } + + // remember to call incrementNextOffsets before using any other method + // which relies on updated value of nextPartitionOffsets + if (driver.numRowsInSegment(identifier) >= driver.getMaxRowPerSegment()) { + maxRowsPerSegmentLimitReached = true; + } + return identifier; + } + + void incrementNextOffsets(ConsumerRecord record) + { + // we do not automatically increment next offset with add call + // as in case record cannot be parsed add method will not be called + // however we still need to increment next offsets irrespective + + // update local nextOffset to be used by committer at some point + nextPartitionOffsets.put(record.partition(), nextPartitionOffsets.get(record.partition()) + 1); + + if (nextPartitionOffsets.get(record.partition()).equals(endOffsets.get(record.partition()))) { + logInfo("[Increment-Offsets] read partition [%d]", record.partition()); + // done with this partition, remove it from end offsets + assignment.remove(record.partition()); + if (assignment.isEmpty()) { + logInfo("[Increment-Offsets] read all partitions"); + Preconditions.checkState( + driverStatus == DriverStatus.OPEN, + "WTH?! Cannot change driver status to [%s] from [%s]", + DriverStatus.COMPLETE, + driverStatus + ); + driverStatus = DriverStatus.COMPLETE; + } + } + } + + boolean isCheckPointingRequired() + { + return maxRowsPerSegmentLimitReached && !checkPointed; + } + + boolean isComplete() + { + return driverStatus == DriverStatus.COMPLETE; + } + + void setEndOffsets(Map offsets) + { + // sanity check again with driver's local offsets + if (!endOffsets.keySet().containsAll(offsets.keySet())) { + throw new IAE( + "Got request to set some offsets not handled by driver [%d], handling [%s] partitions", + endOffsets.keySet() + ); + } + for (Map.Entry entry : offsets.entrySet()) { + if (entry.getValue().compareTo(nextPartitionOffsets.get(entry.getKey())) < 0) { + throw new IAE( + "End offset must be >= current offset for driver [%d] for partition [%s] (current: %s)", + driverIndex, + entry.getKey(), + nextPartitionOffsets.get(entry.getKey()) + ); + } + } + endOffsets.putAll(offsets); + // check if any or all partitions have been consumed + for (Map.Entry entry : endOffsets.entrySet()) { + if (entry.getValue().compareTo(nextPartitionOffsets.get(entry.getKey())) == 0) { + assignment.remove(entry.getKey()); + logInfo("[Set-End-Offsets] read partition [%d]", entry.getKey()); + } + } + logInfo("endOffsets changed to [%s]", endOffsets); + if (assignment.isEmpty()) { + logInfo("[Set-End-Offsets] read all partitions"); + Preconditions.checkState( + driverStatus == DriverStatus.OPEN, + "WTH?! Cannot change driver status to [%s] from [%s]", + DriverStatus.COMPLETE, + driverStatus + ); + driverStatus = DriverStatus.COMPLETE; + } + checkPointed = true; + logInfo("check-pointed"); + } + + SegmentsAndMetadata finish(final TaskToolbox toolbox, final boolean isUseTransaction) + throws InterruptedException + { + try { + final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() + { + @Override + public boolean publishSegments(Set segments, Object commitMetadata) throws IOException + { + final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( + ((Map) commitMetadata).get(KafkaIndexTask.METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); + + // Sanity check, we should only be publishing things that match our desired end state. + if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { + throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); + } + + final SegmentTransactionalInsertAction action; + + if (isUseTransaction) { + action = new SegmentTransactionalInsertAction( + segments, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, startOffsets)), + new KafkaDataSourceMetadata(finalPartitions) + ); + } else { + action = new SegmentTransactionalInsertAction(segments, null, null); + } + + logInfo("Publishing with isTransaction[%s].", isUseTransaction); + + return toolbox.getTaskActionClient().submit(action).isSuccess(); + } + }; + + return driver.finish(publisher, committerSupplier.get()); + } + catch (InterruptedException | RejectedExecutionException e) { + logError("Interrupted in finish"); + throw e; + } + } + + void waitForHandOff() throws InterruptedException + { + driver.waitForHandOff(); + } + + Object persist() throws InterruptedException + { + return driver.persist(committerSupplier.get()); + } + + public DriverMetadata getMetadata() + { + return new DriverMetadata( + topic, + startOffsets, + endOffsets, + driverIndex, + sequenceName, + last, + checkPointed, + maxRowsPerSegmentLimitReached + ); + } + + @Override + public void close() throws IOException + { + if (driverStatus == DriverStatus.CLOSED) { + logWarn("already closed"); + } else { + synchronized (this) { + if (driverStatus == DriverStatus.CLOSED) { + logWarn("already closed"); + return; + } + driverStatus = DriverStatus.CLOSED; + } + driver.getAppenderator().close(); + driver.close(); + } + } + + void setLast() { + last = true; + } + + FiniteAppenderatorDriver getDriver() + { + return driver; + } + + Map getStartOffsets() + { + return startOffsets; + } + + Map getEndOffsets() + { + return endOffsets; + } + + int getDriverIndex() + { + return driverIndex; + } + + boolean isLast() + { + return last; + } + + boolean isCheckPointed() + { + return checkPointed; + } + + private void logInfo(String msg, Object... formatArgs) + { + log.info("Driver [%d], status [%s]: [%s]", driverIndex, driverStatus, String.format(msg, formatArgs)); + } + + private void logWarn(String msg, Object... formatArgs) + { + log.warn("Driver [%d], status [%s]: [%s]", driverIndex, driverStatus, String.format(msg, formatArgs)); + } + + private void logError(String msg, Object... formatArgs) + { + log.error("Driver [%d], status [%s]: [%s]", driverIndex, driverStatus, String.format(msg, formatArgs)); + } + + @Override + public String toString() + { + return "DriverHolder{" + + "driverStatus=" + driverStatus + + ", driver=" + driver + + ", assignment=" + assignment + + ", committerSupplier=" + committerSupplier + + ", nextPartitionOffsets=" + nextPartitionOffsets + + ", topic='" + topic + '\'' + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", sequenceName='" + sequenceName + '\'' + + ", driverIndex=" + driverIndex + + ", last=" + last + + ", checkPointed=" + checkPointed + + ", maxRowsPerSegmentLimitReached=" + maxRowsPerSegmentLimitReached + + '}'; + } + + static DriverHolder getNextDriverHolder( + KafkaIndexTask taskContext, + Map startOffsets, + Map endOffsets, + String baseSequenceName, + FireDepartmentMetrics fireDepartmentMetrics, + TaskToolbox taskToolbox + ) + { + return createDriverHolder( + taskContext, + startOffsets, + endOffsets, + taskContext.nextDriverIndex++, + baseSequenceName, + fireDepartmentMetrics, + taskToolbox, + false, + false, + false + ); + } + + static DriverHolder getNextDriverHolder( + KafkaIndexTask taskContext, + Map startOffsets, + Map endOffsets, + String baseSequenceName, + FireDepartmentMetrics fireDepartmentMetrics, + TaskToolbox taskToolbox, + boolean checkPointed + ) + { + return createDriverHolder( + taskContext, + startOffsets, + endOffsets, + taskContext.nextDriverIndex++, + baseSequenceName, + fireDepartmentMetrics, + taskToolbox, + false, + checkPointed, + false + ); + } + + static DriverHolder createDriverHolder( + KafkaIndexTask taskContext, + Map startOffsets, + Map endOffsets, + int basePersistDirectoryIndex, + String baseSequenceName, + FireDepartmentMetrics fireDepartmentMetrics, + TaskToolbox taskToolbox, + boolean last, + boolean checkPointed, + boolean maxRowsPerSegmentLimitReached + ) + { + final File basePersistDir = new File( + taskToolbox.getTaskWorkDir(), + String.format("%s%s", "persist", basePersistDirectoryIndex > 0 ? basePersistDirectoryIndex : "") + ); + DriverHolder holder = new DriverHolder( + taskContext.getIOConfig().getStartPartitions().getTopic(), + taskContext.newDriver(taskContext.newAppenderator( + fireDepartmentMetrics, + taskToolbox, + basePersistDir + ), taskToolbox, fireDepartmentMetrics), + startOffsets, + endOffsets, + basePersistDirectoryIndex, + String.format("%s_%d", baseSequenceName, basePersistDirectoryIndex), + last, + checkPointed, + maxRowsPerSegmentLimitReached + ); + log.info("Created new Driver with metadata [%s]", holder.getMetadata()); + return holder; + } + + public static class DriverMetadata implements Comparable + { + private final String topic; + private final Map startOffsets; + private final Map endOffsets; + private final int driverIndex; + private final String sequenceName; + private final boolean last; + private final boolean checkPointed; + private final boolean maxRowsPerSegmentLimitReached; + + @JsonCreator + public DriverMetadata( + @JsonProperty("topic") String topic, + @JsonProperty("startOffsets") Map startOffsets, + @JsonProperty("endOffsets") Map endOffsets, + @JsonProperty("driverIndex") int driverIndex, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("last") boolean last, + @JsonProperty("checkPointed") boolean checkPointed, + @JsonProperty("maxRowsPerSegmentLimitReached") boolean maxRowsPerSegmentLimitReached + ) + { + this.topic = topic; + this.startOffsets = startOffsets; + this.endOffsets = endOffsets; + this.driverIndex = driverIndex; + this.sequenceName = sequenceName; + this.last = last; + this.checkPointed = checkPointed; + this.maxRowsPerSegmentLimitReached = maxRowsPerSegmentLimitReached; + } + + @JsonProperty + public int getDriverIndex() + { + return driverIndex; + } + + @JsonProperty + public String getSequenceName() + { + return sequenceName; + } + + @JsonProperty + public Map getEndOffsets() + { + return endOffsets; + } + + @JsonProperty + public Map getStartOffsets() + { + return startOffsets; + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public boolean isLast() + { + return last; + } + + @JsonProperty + public boolean isCheckPointed() + { + return checkPointed; + } + + @JsonProperty + public boolean isMaxRowsPerSegmentLimitReached() + { + return maxRowsPerSegmentLimitReached; + } + + @Override + public String toString() + { + return "DriverMetadata{" + + "topic='" + topic + '\'' + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", driverIndex=" + driverIndex + + ", sequenceName='" + sequenceName + '\'' + + ", last=" + last + + ", checkPointed=" + checkPointed + + ", maxRowsPerSegmentLimitReached=" + maxRowsPerSegmentLimitReached + + '}'; + } + + @Override + public int compareTo(DriverMetadata o) + { + return driverIndex - o.driverIndex; + } + } + + static class SentinelDriverHolder extends DriverHolder + { + final CountDownLatch persistLatch; + final CountDownLatch handOffLatch; + + SentinelDriverHolder( + CountDownLatch persistLatch, + CountDownLatch handOffLatch + ) + { + super(null, null, ImmutableMap.of(), ImmutableMap.of(), -1, null, true, true, true); + super.driverStatus = DriverStatus.COMPLETE; + this.persistLatch = persistLatch; + this.handOffLatch = handOffLatch; + } + + @Override + Object persist() throws InterruptedException + { + persistLatch.countDown(); + return null; + } + + @Override + SegmentsAndMetadata finish(TaskToolbox toolbox, boolean isUseTransaction) throws InterruptedException + { + // Do nothing + return null; + } + + @Override + public void close() throws IOException + { + handOffLatch.countDown(); + } + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index eef8aac5d714..001463b5312a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -23,13 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -38,19 +37,23 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; -import io.druid.data.input.Committer; +import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; -import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; import io.druid.indexing.common.task.TaskResource; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.parsers.ParseException; @@ -58,6 +61,7 @@ import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.realtime.FireDepartment; @@ -66,9 +70,7 @@ import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; -import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; -import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.timeline.DataSegment; @@ -92,14 +94,23 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.Collections; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -123,7 +134,7 @@ public enum Status private static final long POLL_TIMEOUT = 100; private static final long POLL_RETRY_MS = 30000; private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; - private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; + static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; private final DataSchema dataSchema; private final InputRowParser parser; @@ -133,16 +144,18 @@ public enum Status private final Map endOffsets = new ConcurrentHashMap<>(); private final Map nextOffsets = new ConcurrentHashMap<>(); + private final Map maxEndOffsets = new HashMap<>(); - private ObjectMapper mapper; + private TaskToolbox toolbox; - private volatile Appenderator appenderator = null; private volatile FireDepartmentMetrics fireDepartmentMetrics = null; private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) private volatile Thread runThread = null; - private volatile boolean stopRequested = false; - private volatile boolean publishOnStop = false; + + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + private final AtomicBoolean publishOnStop = new AtomicBoolean(false); + private final AtomicReference throwableAtomicReference = new AtomicReference<>(null); // The pause lock and associated conditions are to support coordination between the Jetty threads and the main // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully @@ -182,6 +195,23 @@ public enum Status private volatile boolean pauseRequested = false; private volatile long pauseMillis = 0; + volatile int nextDriverIndex = 0; + // Reverse sorted list of DriverHolders i.e. the most recent driverHolder is at front and the oldest at last + private final List driverHolders = new CopyOnWriteArrayList<>(); + private final Lock driversListLock = new ReentrantLock(); + + private final BlockingDeque publishQueue = new LinkedBlockingDeque<>(); + private final BlockingDeque handOffQueue = new LinkedBlockingDeque<>(); + + private final ListeningExecutorService persistExecService; + private final ListeningExecutorService publishExecService; + private final ListeningExecutorService handOffExecService; + + private File driversRestoreFile; + + private final CountDownLatch persistLatch = new CountDownLatch(1); + private final CountDownLatch handOffLatch = new CountDownLatch(1); + @JsonCreator public KafkaIndexTask( @JsonProperty("id") String id, @@ -208,6 +238,15 @@ public KafkaIndexTask( this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); + for (Integer partition : endOffsets.keySet()) { + maxEndOffsets.put(partition, Long.MAX_VALUE); + } + this.persistExecService = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("persist-%d", 0)); + this.publishExecService = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("publish-driver-%d", 1)); + this.handOffExecService = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded( + "handoff-checker-%d", + 1 + )); } private static String makeTaskId(String dataSource, int randomBits) @@ -249,14 +288,181 @@ public KafkaIOConfig getIOConfig() return ioConfig; } + private void startExecutors() + { + // start publish executor service + publishExecService.submit( + new Runnable() + { + @Override + public void run() + { + while (true) { + DriverHolder driverHolder = null; + try { + driverHolder = publishQueue.take(); + + Preconditions.checkState( + driverHolder.driverStatus == DriverHolder.DriverStatus.PERSISTED, + String.format( + "WTH?! driver trying to publish but not yet persisted, driver status: [%s]", + driverHolder.driverStatus + ) + ); + + driverHolder.driverStatus = DriverHolder.DriverStatus.PUBLISHING; + + log.info("Publishing driver [%s]", driverHolder.getMetadata()); + + final SegmentsAndMetadata result = driverHolder.finish(toolbox, ioConfig.isUseTransaction()); + + if (result == null) { + if (driverHolder.getMetadata().getDriverIndex() == -1) { + // indicates all drivers are finished, ok to shutdown + log.info("All drivers have published segments to the metadata store"); + } else { + throw new ISE( + "Transaction failure publishing segments for driver [%s]", + driverHolder.getMetadata() + ); + } + } else { + log.info( + "Published segments[%s] with metadata[%s].", + Joiner.on(", ").join( + Iterables.transform( + result.getSegments(), + new Function() + { + @Override + public String apply(DataSegment input) + { + return input.getIdentifier(); + } + } + ) + ), + result.getCommitMetadata() + ); + } + + driverHolder.driverStatus = DriverHolder.DriverStatus.PUBLISHED; + + handOffQueue.addLast(driverHolder); + } + catch (Throwable t) { + if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException + && t.getCause() instanceof InterruptedException))) { + if (stopRequested.get() || handOffLatch.getCount() == 0) { + // we are shutting down, ignore the interrupt + log.warn("Stopping publish thread as we are interrupted and shutting down"); + break; + } else { + // enqueue back + if (driverHolder != null) { + log.error( + t, + "Error in publish thread, enqueueing driver [%d] back to publish queue", + driverHolder.getDriverIndex() + ); + driverHolder.driverStatus = DriverHolder.DriverStatus.PERSISTED; + publishQueue.addFirst(driverHolder); + } + continue; + } + } + log.makeAlert(t, "Error in publish thread, dying").emit(); + throwableAtomicReference.set(t); + handOffLatch.countDown(); + Throwables.propagate(t); + } + } + } + } + ); + + handOffExecService.submit( + new Runnable() + { + @Override + public void run() + { + while (true) { + try { + final DriverHolder driverHolder = handOffQueue.take(); + + Preconditions.checkState( + driverHolder.driverStatus == DriverHolder.DriverStatus.PUBLISHED, + String.format( + "WTH?! cannot wait for hand off for not published driver [%d], status: [%s]", + driverHolder.getDriverIndex(), + driverHolder.driverStatus + ) + ); + + log.info("Waiting for driver [%s] to hand off", driverHolder.getMetadata()); + try { + if (driverHolder.getDriverIndex() != -1) { + driverHolder.waitForHandOff(); + } + log.info("Handoff complete for driver [%d]", driverHolder.getDriverIndex()); + } + catch (InterruptedException t) { + if (stopRequested.get()) { + log.warn("Stopping handoff thread as we are interrupted and shutting down"); + break; + } else { + // enqueue back + handOffQueue.addFirst(driverHolder); + } + } + finally { + driverHolder.close(); + if (!driverHolders.remove(driverHolder)) { + log.warn( + "Unable to remove driver [%d], it was not in the drivers list", + driverHolder.getDriverIndex() + ); + } else { + try { + lockDriversList(); + persistDriversList(); + log.info("Driver [%s] removed from drivers list", driverHolder.getMetadata()); + } + finally { + unlockDriversList(); + } + } + } + } + catch (Throwable t) { + if (t instanceof InterruptedException && (stopRequested.get() || handOffLatch.getCount() == 0)) { + log.warn("Stopping handoff thread as we are interrupted and shutting down"); + break; + } + log.makeAlert(t, "Error in handoff thread, dying").emit(); + throwableAtomicReference.set(t); + handOffLatch.countDown(); + Throwables.propagate(t); + } + } + } + } + ); + } + @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { log.info("Starting up!"); startTime = DateTime.now(); - mapper = toolbox.getObjectMapper(); + this.toolbox = toolbox; status = Status.STARTING; + startExecutors(); + + toolbox.getTaskWorkDir().mkdirs(); + if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); chatHandlerProvider.get().register(getId(), this, false); @@ -280,80 +486,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ) ); - try ( - final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); - final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); - final KafkaConsumer consumer = newConsumer() - ) { - appenderator = appenderator0; + final KafkaConsumer consumer = newConsumer(); + try { final String topic = ioConfig.getStartPartitions().getTopic(); - // Start up, set up initial offsets. - final Object restoredMetadata = driver.startJob(); - if (restoredMetadata == null) { - nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap()); - } else { - final Map restoredMetadataMap = (Map) restoredMetadata; - final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue( - restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), - KafkaPartitions.class - ); - nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap()); - - // Sanity checks. - if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) { - throw new ISE( - "WTF?! Restored topic[%s] but expected topic[%s]", - restoredNextPartitions.getTopic(), - ioConfig.getStartPartitions().getTopic() - ); - } - - if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) { - throw new ISE( - "WTF?! Restored partitions[%s] but expected partitions[%s]", - nextOffsets.keySet(), - ioConfig.getStartPartitions().getPartitionOffsetMap().keySet() - ); - } - } - - // Set up sequenceNames. - final Map sequenceNames = Maps.newHashMap(); - for (Integer partitionNum : nextOffsets.keySet()) { - sequenceNames.put(partitionNum, String.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum)); - } - - // Set up committer. - final Supplier committerSupplier = new Supplier() - { - @Override - public Committer get() - { - final Map snapshot = ImmutableMap.copyOf(nextOffsets); - - return new Committer() - { - @Override - public Object getMetadata() - { - return ImmutableMap.of( - METADATA_NEXT_PARTITIONS, new KafkaPartitions( - ioConfig.getStartPartitions().getTopic(), - snapshot - ) - ); - } - - @Override - public void run() - { - // Do nothing. - } - }; - } - }; + restoreState(toolbox); Set assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -361,8 +499,10 @@ public void run() // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; + try { while (stillReading) { + checkAndMayBeThrowException(); if (possiblyPause(assignment)) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. @@ -370,12 +510,12 @@ public void run() if (assignment.isEmpty()) { log.info("All partitions have been fully read"); - publishOnStop = true; - stopRequested = true; + publishOnStop.set(true); + stopRequested.set(true); } } - if (stopRequested) { + if (stopRequested.get()) { break; } @@ -393,6 +533,7 @@ public void run() } for (ConsumerRecord record : records) { + if (log.isTraceEnabled()) { log.trace( "Got topic[%s] partition[%d] offset[%,d].", @@ -412,7 +553,24 @@ public void run() ); } + DriverHolder driverHolder = null; try { + // find a Driver to consume this record + for (DriverHolder holder : driverHolders) { + if (holder.canHandle(record)) { + driverHolder = holder; + break; + } + } + if (driverHolder == null) { + throw new ISE( + "WTH?! Could not find a driver to handle record: partition [%d] offset [%d], current drivers list: [%s]", + record.partition(), + record.offset(), + driverHolders + ); + } + final byte[] valueBytes = record.value(); if (valueBytes == null) { throw new ParseException("null value"); @@ -423,19 +581,7 @@ public void run() if (!ioConfig.getMinimumMessageTime().isPresent() || !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { - final SegmentIdentifier identifier = driver.add( - row, - sequenceNames.get(record.partition()), - committerSupplier - ); - - if (identifier == null) { - // Failure to allocate segment puts determinism at risk, bail out to be safe. - // May want configurable behavior here at some point. - // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. - throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); - } - + driverHolder.add(row); fireDepartmentMetrics.incrementProcessed(); } else { fireDepartmentMetrics.incrementThrownAway(); @@ -456,7 +602,34 @@ public void run() } } + driverHolder.incrementNextOffsets(record); nextOffsets.put(record.partition(), record.offset() + 1); + if (driverHolder.isComplete()) { + if (driverHolders.get(0) == driverHolder && ioConfig.isPauseAfterRead()) { + // this is the latest driver and isPauseAfterRead is set + // means that setEndOffset will be called, so create + // a new driver whose end offset will be set by that call + try { + log.info("Creating new driver as pauseAfterRead is set and the latest driver is not the last driver"); + lockDriversList(); + final DriverHolder nextDriverHolder = DriverHolder.getNextDriverHolder( + this, + driverHolders.get(0).getEndOffsets(), + maxEndOffsets, + ioConfig.getBaseSequenceName(), + fireDepartmentMetrics, + toolbox + ); + driverHolders.add(0, nextDriverHolder); + nextDriverHolder.startJob(toolbox.getObjectMapper()); + persistDriversList(); + } + finally { + unlockDriversList(); + } + } + persistAndPossiblyPublish(driverHolder); + } } if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) @@ -466,98 +639,277 @@ public void run() stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); } } + + // check if we hit the maxRowsInSegment limit for the latest driver + DriverHolder latestDriver; + try { + lockDriversList(); + latestDriver = driverHolders.size() > 0 ? driverHolders.get(0) : null; + } + finally { + unlockDriversList(); + } + if (latestDriver != null && latestDriver.isCheckPointingRequired()) { + // time to finish this driver + // send a call to Supervisor to check point the current highest offsets for all replicas + // supervisor will resume the tasks with a check point which will be set as end offset of the latest driver + pause(-1L); + + KafkaDataSourceMetadata previousCheckPoint = null; + if (!latestDriver.getStartOffsets().equals(ioConfig.getStartPartitions().getPartitionOffsetMap())) { + previousCheckPoint = new KafkaDataSourceMetadata(new KafkaPartitions( + topic, + latestDriver.getStartOffsets() + )); + } + + if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( + getDataSource(), + ioConfig.getBaseSequenceName(), + previousCheckPoint, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) + ))) { + throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets); + } + } } } - finally { - driver.persist(committerSupplier.get()); // persist pending data + catch (Throwable t) { + // if any exception is thrown in the ingestion loop, task should persist pending drivers and die (skip publish) + stopRequested.set(true); + publishOnStop.set(false); + throw t; } - - synchronized (statusLock) { - if (stopRequested && !publishOnStop) { - throw new InterruptedException("Stopping without publishing"); + finally { + synchronized (statusLock) { + // If either publish on stop is set or if stop is not requested (this happens when end offsets were already set + // to some meaningful value when the task started and the task has consumed till the end offsets) + if (publishOnStop.get() || !stopRequested.get()) { + status = Status.PUBLISHING; + } } - - status = Status.PUBLISHING; + log.info("Ingestion loop finished, Persisting all pending drivers..."); + for (DriverHolder driverHolder : driverHolders) { + if (driverHolder.driverStatus == DriverHolder.DriverStatus.OPEN || driverHolder.isComplete()) { + driverHolder.driverStatus = DriverHolder.DriverStatus.COMPLETE; + persistAndPossiblyPublish(driverHolder); + } else { + log.warn( + "Not adding driver [%s] to persist and publish queue as it should already be there", + driverHolder.getMetadata() + ); + } + } + // add Sentinel Driver at the end so that task can wait till the persistLatch and handOffLatch is countdown + persistAndPossiblyPublish(new DriverHolder.SentinelDriverHolder(persistLatch, handOffLatch)); } - final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() - { - @Override - public boolean publishSegments(Set segments, Object commitMetadata) throws IOException - { - final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), - KafkaPartitions.class - ); + persistLatch.await(); + log.info("[Shutting Down]: All drivers persisted"); + checkAndMayBeThrowException(); + handOffLatch.await(); + log.info("[Shutting Down]: All drivers handed-off"); + checkAndMayBeThrowException(); + } + catch (Throwable t) { + // so that when executors are interrupted by shutdownNow call, they know it is expected + stopRequested.set(true); + throw t; + } + finally { + persistExecService.shutdownNow(); + // interrupts the publish thread so that no drivers are closed or enqueued again + // as we will be closing all of them so that segments will be unannounced + + // all the executors should be shutdown before closing the driver to prevent deadlocks + // persistExecutor and pushExecutor (in AppenderatorImpl) depend on each other and if there are + // two threads trying to use them deadlock is possible + publishExecService.shutdownNow(); + handOffExecService.shutdownNow(); + consumer.close(); + for (DriverHolder driverHolder : driverHolders) { + driverHolder.close(); + } + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(getId()); + } + } - // Sanity check, we should only be publishing things that match our desired end state. - if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { - throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); - } + return success(); + } - final SegmentTransactionalInsertAction action; + private void checkAndMayBeThrowException() + { + // check for any exception set by other executor threads than the task runner thread because of which the task should fail + if (throwableAtomicReference.get() != null) { + Throwables.propagate(throwableAtomicReference.get()); + } + } - if (ioConfig.isUseTransaction()) { - action = new SegmentTransactionalInsertAction( - segments, - new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), - new KafkaDataSourceMetadata(finalPartitions) - ); - } else { - action = new SegmentTransactionalInsertAction(segments, null, null); + private ListenableFuture persistAndPossiblyPublish(final DriverHolder driverHolder) + { + log.info("Persisting and possibly publishing driver [%s]", driverHolder.getMetadata()); + Preconditions.checkState( + driverHolder.driverStatus == DriverHolder.DriverStatus.COMPLETE, + String.format( + "WTH?! Cannot persist driver which is not complete, driver status: [%s]", + driverHolder.driverStatus + ) + ); + driverHolder.driverStatus = DriverHolder.DriverStatus.PERSISTING; + return persistExecService.submit( + new Callable() + { + @Override + public Object call() throws Exception + { + Object result = null; + try { + result = driverHolder.persist(); + driverHolder.driverStatus = DriverHolder.DriverStatus.PERSISTED; + log.info("Driver [%d] persisted with result [%s]", driverHolder.getDriverIndex(), result); + + if (stopRequested.get() && !publishOnStop.get()) { + log.warn("Skipping publish of driver [%d] as we are asked to stop", driverHolder.getDriverIndex()); + } else { + log.info("Adding driver to publish queue, [%s]", driverHolder.getMetadata()); + publishQueue.addLast(driverHolder); + } + } + catch (Exception e) { + if (e instanceof InterruptedException && (stopRequested.get() || handOffLatch.getCount() == 0)) { + log.warn("Interrupted while persisting driver [%d], aborting persist", driverHolder.getDriverIndex()); + return null; + } + log.error("Error [%s] while persisting driver [%s]", e.getMessage(), driverHolder.getMetadata()); + throwableAtomicReference.set(e); + handOffLatch.countDown(); + } + return result; } + } + ); + } + + private void lockDriversList() throws InterruptedException + { + log.debug("Thread [%s] locking drivers list", Thread.currentThread()); + driversListLock.lockInterruptibly(); + } - log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); + private void unlockDriversList() + { + log.debug("Thread [%s] unlocking drivers list", Thread.currentThread()); + driversListLock.unlock(); + } - return toolbox.getTaskActionClient().submit(action).isSuccess(); + private void restoreState(TaskToolbox toolbox) throws IOException, InterruptedException + { + driversRestoreFile = new File(toolbox.getTaskWorkDir(), "drivers.json"); + List persistedDrivers = ImmutableList.of(); + // check for persisted Drivers information + if (driversRestoreFile.exists()) { + persistedDrivers = toolbox.getObjectMapper().readValue( + driversRestoreFile, + new TypeReference>() + { + } + ); + } + final Object checkPointsObject = getContextValue("check_points"); + List checkPoints = ImmutableList.of(); + if (checkPointsObject != null) { + checkPoints = toolbox.getObjectMapper().readValue( + (String) checkPointsObject, + new TypeReference>() + { + } + ); + } + log.info("Got check points: [%s]", checkPoints); + + if (persistedDrivers.size() > 0) { + Collections.sort(persistedDrivers); + log.info("Trying to restore drivers list [%s]", persistedDrivers); + + for (DriverHolder.DriverMetadata driverMetadata : persistedDrivers) { + Preconditions.checkState( + driverMetadata.getSequenceName().startsWith(ioConfig.getBaseSequenceName()), + String.format( + "Sequence Name validation failed while restoring driver with metadata [%s]", + driverMetadata + ) + ); + final DriverHolder driverHolder = DriverHolder.createDriverHolder( + this, + driverMetadata.getStartOffsets(), + driverMetadata.getEndOffsets(), + driverMetadata.getDriverIndex(), + driverMetadata.getSequenceName().substring(0, driverMetadata.getSequenceName().lastIndexOf("_")), + fireDepartmentMetrics, + toolbox, + driverMetadata.isLast(), + driverMetadata.isCheckPointed(), + driverMetadata.isMaxRowsPerSegmentLimitReached() + ); + driverHolders.add(0, driverHolder); + final Map restoredNextPartitionsOffset = driverHolder.startJob(toolbox.getObjectMapper()); + // Set nextOffset to be the highest offset for each partition among all persisted drivers + for (Map.Entry partitionOffset : restoredNextPartitionsOffset.entrySet()) { + if (!nextOffsets.containsKey(partitionOffset.getKey()) || partitionOffset.getValue() > nextOffsets.get( + partitionOffset.getKey())) { + nextOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + } } - }; + nextDriverIndex = driverMetadata.getDriverIndex() + 1; - final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); - if (published == null) { - throw new ISE("Transaction failure publishing segments, aborting"); + if (driverHolder.isComplete()) { + persistAndPossiblyPublish(driverHolder); + } + } + } else { + if (checkPoints.size() == 0) { + // create a new driver + driverHolders.add( + 0, + DriverHolder.getNextDriverHolder( + this, + ioConfig.getStartPartitions().getPartitionOffsetMap(), + endOffsets, + ioConfig.getBaseSequenceName(), + fireDepartmentMetrics, + toolbox + ) + ); + // Start up, set up initial offsets. + nextOffsets.putAll(driverHolders.get(0).startJob(toolbox.getObjectMapper())); } else { - log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - Iterables.transform( - published.getSegments(), - new Function() - { - @Override - public String apply(DataSegment input) - { - return input.getIdentifier(); - } - } - ) - ), - published.getCommitMetadata() + // create a driver corresponding to the latest checkpoint, assume previous checkpointed driver successfully + // published their segments + driverHolders.add( + 0, + DriverHolder.getNextDriverHolder( + this, + (((KafkaDataSourceMetadata) checkPoints.get(checkPoints.size()-1)).getKafkaPartitions() + .getPartitionOffsetMap()), + endOffsets, + ioConfig.getBaseSequenceName(), + fireDepartmentMetrics, + toolbox, + true + ) ); + nextOffsets.putAll(driverHolders.get(0).startJob(toolbox.getObjectMapper())); } } - catch (InterruptedException | RejectedExecutionException e) { - // handle the InterruptedException that gets wrapped in a RejectedExecutionException - if (e instanceof RejectedExecutionException - && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { - throw e; - } - - // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow - if (!stopRequested) { - Thread.currentThread().interrupt(); - throw e; - } + // save latest state on disk + persistDriversList(); - log.info("The task was asked to stop before completing"); - } - finally { - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(getId()); - } + if (driverHolders.get(0).isLast()) { + // recovered a driver which happens to be the last driver for this task + // set the end offsets for this task so that the task shutdowns eventually + endOffsets.putAll(driverHolders.get(0).getEndOffsets()); } - - return success(); } @Override @@ -572,11 +924,13 @@ public boolean canRestore() public void stopGracefully() { log.info("Stopping gracefully (status: [%s])", status); - stopRequested = true; + stopRequested.set(true); + // don't wait for publishes/handoff to complete + handOffLatch.countDown(); synchronized (statusLock) { if (status == Status.PUBLISHING) { - runThread.interrupt(); + // no need to try to resume, return immediately return; } } @@ -618,19 +972,35 @@ public void stopGracefully() @Override public QueryRunner getQueryRunner(Query query) { - if (appenderator == null) { + if (driverHolders == null || driverHolders.size() == 0 || toolbox == null) { // Not yet initialized, no data yet, just return a noop runner. return new NoopQueryRunner<>(); } - - return new QueryRunner() - { - @Override - public Sequence run(final Query query, final Map responseContext) - { - return query.run(appenderator, responseContext); - } - }; + final QueryRunnerFactory> queryRunnerFactory = toolbox.getQueryRunnerFactoryConglomerate() + .findFactory(query); + return queryRunnerFactory.getToolchest().mergeResults( + queryRunnerFactory.mergeRunners( + toolbox.getQueryExecutorService(), + Iterables.transform( + driverHolders, + new Function>() + { + @Override + public QueryRunner apply(final DriverHolder input) + { + return new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + return query.run(input.getDriver().getAppenderator(), responseContext); + } + }; + } + } + ) + ) + ); } @GET @@ -663,7 +1033,9 @@ public Map getEndOffsets() @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsets( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume + @QueryParam("resume") @DefaultValue("false") final boolean resume, + @QueryParam("finish") @DefaultValue("true") final boolean finish + // this field is only for internal purposes, should never be set by users ) throws InterruptedException { if (offsets == null) { @@ -681,6 +1053,10 @@ public Response setEndOffsets( .build(); } + if (status == Status.NOT_STARTED || status == Status.STARTING) { + return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("Task has not started running yet!").build(); + } + pauseLock.lockInterruptibly(); try { if (!isPaused()) { @@ -703,8 +1079,67 @@ public Response setEndOffsets( } } - endOffsets.putAll(offsets); - log.info("endOffsets changed to %s", endOffsets); + lockDriversList(); + Preconditions.checkState(driverHolders.size() > 0, "WTH?! No drivers found to set end offsets"); + final DriverHolder driverHolder = driverHolders.get(0); + + try { + if (driverHolder.isCheckPointed()) { + // this should only happen when we got another setEndOffsets call after the final setEndOffsets call + // check for consistency and duplicate request + Preconditions.checkState( + endOffsets.equals(driverHolder.getEndOffsets()), + "WTH?! End offsets for task [%s] and latest driver [%s] do not match", + endOffsets, + driverHolder.getMetadata() + ); + // ignore duplicate requests + if (offsets.equals(driverHolder.getEndOffsets())) { + log.warn( + "end offsets already set to [%s], ignoring duplicate request to set to [%s]", + driverHolder.getEndOffsets(), + offsets + ); + return Response.ok(endOffsets).build(); + } else { + throw new ISE( + "WTH?! end offsets set to [%s], ignoring request to set to [%s]", + driverHolder.getEndOffsets(), + offsets + ); + } + } + driverHolder.setEndOffsets(offsets); + + if (finish) { + // set the last flag, useful while restoring state from disk to set endOffsets + driverHolder.setLast(); + endOffsets.putAll(offsets); + } else { + // create next driver + final DriverHolder nextDriverHolder = DriverHolder.getNextDriverHolder( + this, + driverHolder.getEndOffsets(), //previous driver endOffsets + endOffsets, // task endOffsets + ioConfig.getBaseSequenceName(), + fireDepartmentMetrics, + toolbox + ); + driverHolders.add(0, nextDriverHolder); + nextDriverHolder.startJob(toolbox.getObjectMapper()); + } + persistDriversList(); + if (driverHolder.isComplete()) { + persistAndPossiblyPublish(driverHolder); + } + } + catch (Exception e) { + log.error(e, "Exception while setting end offsets [%s]", offsets); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); + } + finally { + unlockDriversList(); + } } finally { pauseLock.unlock(); @@ -717,6 +1152,29 @@ public Response setEndOffsets( return Response.ok(endOffsets).build(); } + private void persistDriversList() throws IOException, InterruptedException + { + try { + lockDriversList(); + log.info("Persisting drivers list [%s]", driverHolders); + toolbox.getObjectMapper().writerWithType(new TypeReference>() + { + }).writeValue(driversRestoreFile, Lists.newArrayList(Iterables.transform( + driverHolders, new Function() + { + @Override + public DriverHolder.DriverMetadata apply(DriverHolder input) + { + return input.getMetadata(); + } + } + ))); + } + finally { + unlockDriversList(); + } + } + /** * Signals the ingestion loop to pause. * @@ -770,7 +1228,7 @@ public Response pause(@QueryParam("timeout") @DefaultValue("0") final long timeo } try { - return Response.ok().entity(mapper.writeValueAsString(getCurrentOffsets())).build(); + return Response.ok().entity(toolbox.getObjectMapper().writeValueAsString(getCurrentOffsets())).build(); } catch (JsonProcessingException e) { throw Throwables.propagate(e); @@ -818,14 +1276,11 @@ private boolean isPaused() return status == Status.PAUSED; } - private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) + Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, File basePersistDirectory) { - final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() / - ioConfig.getStartPartitions().getPartitionOffsetMap().size()); return Appenderators.createRealtime( dataSchema, - tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) - .withMaxRowsInMemory(maxRowsInMemoryPerPartition), + tuningConfig.withBasePersistDirectory(basePersistDirectory), metrics, toolbox.getSegmentPusher(), toolbox.getObjectMapper(), @@ -840,7 +1295,7 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox ); } - private FiniteAppenderatorDriver newDriver( + FiniteAppenderatorDriver newDriver( final Appenderator appenderator, final TaskToolbox toolbox, final FireDepartmentMetrics metrics @@ -1030,7 +1485,7 @@ private void possiblyResetOffsetsOrWait( pollRetryLock.lockInterruptibly(); try { long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS); - while (nanos > 0L && !pauseRequested && !stopRequested) { + while (nanos > 0L && !pauseRequested && !stopRequested.get()) { nanos = isAwaitingRetry.awaitNanos(nanos); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 71f5ea60f182..446ed68837cb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -186,6 +186,7 @@ public Map pause(final String id, final long timeout) final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); while (true) { if (getStatus(id) == KafkaIndexTask.Status.PAUSED) { + log.info("Task [%s] paused successfully", id); return getCurrentOffsets(id, true); } @@ -257,9 +258,11 @@ public Map getCurrentOffsets(final String id, final boolean retry return jsonMapper.readValue(response.getContent(), new TypeReference>() {}); } catch (NoTaskLocationException e) { + log.error("No location available for task [%s]", id); return ImmutableMap.of(); } catch (IOException e) { + log.error("Exception while getting current offsets [%s]", e.getMessage()); throw Throwables.propagate(e); } } @@ -280,21 +283,16 @@ public Map getEndOffsets(final String id) } } - public boolean setEndOffsets(final String id, final Map endOffsets) + public boolean setEndOffsets(final String id, final Map endOffsets, final boolean resume, final boolean finalize) { - return setEndOffsets(id, endOffsets, false); - } - - public boolean setEndOffsets(final String id, final Map endOffsets, final boolean resume) - { - log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s]", id, endOffsets, resume); + log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "offsets/end", - resume ? "resume=true" : null, + String.format("resume=%s&finish=%s", resume, finalize), jsonMapper.writeValueAsBytes(endOffsets), true ); @@ -411,13 +409,8 @@ public Map call() throws Exception ); } - public ListenableFuture setEndOffsetsAsync(final String id, final Map endOffsets) - { - return setEndOffsetsAsync(id, endOffsets, false); - } - public ListenableFuture setEndOffsetsAsync( - final String id, final Map endOffsets, final boolean resume + final String id, final Map endOffsets, final boolean resume, final boolean finalize ) { return executorService.submit( @@ -426,7 +419,7 @@ public ListenableFuture setEndOffsetsAsync( @Override public Boolean call() throws Exception { - return setEndOffsets(id, endOffsets, resume); + return setEndOffsets(id, endOffsets, resume, finalize); } } ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 91deb18a161e..59212f653ae5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -20,6 +20,7 @@ package io.druid.indexing.kafka.supervisor; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -31,6 +32,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -452,6 +454,18 @@ public void reset(DataSourceMetadata dataSourceMetadata) notices.add(new ResetNotice(dataSourceMetadata)); } + @Override + public void checkPoint( + String sequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) + { + Preconditions.checkNotNull(sequenceName, "Cannot check point without a sequence name"); + log.info("Check-pointing [%s]", currentCheckPoint); + notices.add(new CheckpointNotice(sequenceName, previousCheckPoint, currentCheckPoint)); + } + public void possiblyRegisterListener() { // getTaskRunner() sometimes fails if the task queue is still being initialized so retry later until we succeed @@ -491,7 +505,7 @@ public void statusChanged(String taskId, TaskStatus status) private interface Notice { - void handle() throws ExecutionException, InterruptedException, TimeoutException; + void handle() throws ExecutionException, InterruptedException, TimeoutException, IOException; } private class RunNotice implements Notice @@ -551,6 +565,65 @@ public void handle() } } + private class CheckpointNotice implements Notice + { + final String sequenceName; + final DataSourceMetadata previousCheckPoint; + final DataSourceMetadata currentCheckPoint; + + CheckpointNotice(String sequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) + { + this.sequenceName = sequenceName; + this.previousCheckPoint = previousCheckPoint; + this.currentCheckPoint = currentCheckPoint; + } + + @Override + public void handle() throws ExecutionException, InterruptedException, TimeoutException, IOException + { + // go to database and check for consistency + // if already received request for this sequence name and dataSourceMetadata combination + // then return without doing anything + + // checkPoints will never be null, it can be an empty list + List checkPoints = indexerMetadataStorageCoordinator.getCheckPointsForSequence(sequenceName); + + // check validity of previousCheckPoint if it is not null + if (previousCheckPoint != null) { + int index = checkPoints.size() - 1; + while (index >= 0) { + DataSourceMetadata dataSourceMetadata = checkPoints.get(index); + if (dataSourceMetadata.equals(previousCheckPoint)) { + break; + } + index--; + } + if (index < 0) { + throw new ISE("No previous checkpoint [%s] found in metadata store", previousCheckPoint); + } else if (index < checkPoints.size() - 1) { + // if the found check point is not the latest one + // already check pointed by replica + Preconditions.checkState(index == checkPoints.size() - 2, "checkpoint consistency failure"); + log.info("Already check pointed with dataSourceMetadata [%s]", checkPoints.get(checkPoints.size() - 2)); + return; + } + } else { + // There cannot be more than one check point in metadata store when previous check point is null + // as when the task starts they are sent existing check points + Preconditions.checkState( + checkPoints.size() <= 1, + "Got check point request with null as previous check point, however found more than one checkpoints in metadata store" + ); + if (checkPoints.size() == 1) { + log.info("Already check pointed with dataSourceMetadata [%s]", checkPoints.get(0)); + return; + } + } + Map checkPoint = checkPointInternal(sequenceName, currentCheckPoint); + log.info("Handled Check point notice, new check point is [%s]", checkPoint); + } + } + @VisibleForTesting void resetInternal(DataSourceMetadata dataSourceMetadata) { @@ -645,6 +718,21 @@ private void killTaskGroupForPartitions(Set partitions) } } + private Map checkPointInternal(String sequenceName, DataSourceMetadata dataSourceMetadata) + throws ExecutionException, InterruptedException + { + if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { + throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); + } + // Find task group corresponding to the dataSourceMetadata + final int taskGroupId = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions() + .getPartitionOffsetMap() + .keySet() + .iterator() + .next(); + return checkPointTasks(sequenceName, taskGroupId, false).get(); + } + @VisibleForTesting void gracefulShutdownInternal() throws ExecutionException, InterruptedException, TimeoutException { @@ -1048,7 +1136,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); - futures.add(signalTasksToFinish(groupId)); + futures.add(checkPointTasks(generateSequenceName(groupId), groupId, true)); } } @@ -1084,40 +1172,46 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException } } - private ListenableFuture> signalTasksToFinish(final int groupId) + private ListenableFuture> checkPointTasks( + final String sequenceName, + final int groupId, + final boolean finalize + ) { final TaskGroup taskGroup = taskGroups.get(groupId); - // 1) Check if any task completed (in which case we're done) and kill unassigned tasks - Iterator> i = taskGroup.tasks.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry taskEntry = i.next(); - String taskId = taskEntry.getKey(); - TaskData task = taskEntry.getValue(); - - if (task.status.isSuccess()) { - // If any task in this group has already completed, stop the rest of the tasks in the group and return. - // This will cause us to create a new set of tasks next cycle that will start from the offsets in - // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing - // failed and we need to re-ingest) - return Futures.transform( - stopTasksInGroup(taskGroup), new Function>() - { - @Nullable - @Override - public Map apply(@Nullable Object input) + if (finalize) { + // 1) Check if any task completed (in which case we're done) and kill unassigned tasks + Iterator> i = taskGroup.tasks.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry taskEntry = i.next(); + String taskId = taskEntry.getKey(); + TaskData task = taskEntry.getValue(); + + if (task.status.isSuccess()) { + // If any task in this group has already completed, stop the rest of the tasks in the group and return. + // This will cause us to create a new set of tasks next cycle that will start from the offsets in + // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing + // failed and we need to re-ingest) + return Futures.transform( + stopTasksInGroup(taskGroup), new Function>() { - return null; + @Nullable + @Override + public Map apply(@Nullable Object input) + { + return null; + } } - } - ); - } + ); + } - if (task.status.isRunnable()) { - if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { - log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); - killTask(taskId); - i.remove(); + if (task.status.isRunnable()) { + if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { + log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); + killTask(taskId); + i.remove(); + } } } } @@ -1167,12 +1261,36 @@ public Map apply(List> input) return null; } - log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); - for (final String taskId : setEndOffsetTaskIds) { - setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true)); - } - try { + final KafkaDataSourceMetadata newCheckPoint = new KafkaDataSourceMetadata(new KafkaPartitions( + ioConfig.getTopic(), + endOffsets + )); + if (endOffsets.equals(taskGroup.partitionOffsets)) { + log.warn( + "Not adding check point [%s] for sequence [%s] as its offsets are same as the start offsets [%s] for the task group [%d]", + newCheckPoint, + sequenceName, + taskGroup.partitionOffsets, + groupId + ); + } else { + if (!indexerMetadataStorageCoordinator.addNewCheckPointForSequence(sequenceName, newCheckPoint)) { + String errorMessage = String.format( + "Failed to add new check point [%s] for sequence [%s]", + newCheckPoint, + sequenceName + ); + log.error(errorMessage); + throw new ISE(errorMessage); + } + } + + log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); + for (final String taskId : setEndOffsetTaskIds) { + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize)); + } + List results = Futures.successfulAsList(setEndOffsetFutures) .get(futureTimeoutInSeconds, TimeUnit.SECONDS); for (int i = 0; i < results.size(); i++) { @@ -1185,6 +1303,7 @@ public Map apply(List> input) } } catch (Exception e) { + log.error("Something bad happened [%s]", e.getMessage()); Throwables.propagate(e); } @@ -1258,8 +1377,9 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId); } else { log.makeAlert( - "No task in [%s] succeeded before the completion timeout elapsed [%s]!", + "No task [%s] in group id [%s] succeeded before the completion timeout elapsed [%s]!", group.taskIds(), + groupId, ioConfig.getCompletionTimeout() ).emit(); } @@ -1267,14 +1387,16 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.remove(groupId); + log.warn("Killing all tasks in pending group [%s]", group.taskIds()); // stop all the tasks in this pending completion group - futures.add(stopTasksInGroup(group)); + killTasksInGroup(group); // set a flag so the other pending completion groups for this set of partitions will also stop stopTasksInTaskGroup = true; + log.warn("Killing all tasks in current group [%s]", taskGroups.get(groupId) == null ? ImmutableSet.of() : taskGroups.get(groupId).taskIds()); // stop all the tasks in the currently reading task group and remove the bad task group - futures.add(stopTasksInGroup(taskGroups.remove(groupId))); + killTasksInGroup(taskGroups.remove(groupId)); toRemove.add(group); } @@ -1404,6 +1526,36 @@ private void createKafkaTasksForGroup(int groupId, int replicas) minimumMessageTime ); + // get check points for this sequence_name from metadata store and send them in context + List checkPoints; + try { + checkPoints = indexerMetadataStorageCoordinator.getCheckPointsForSequence(sequenceName); + log.info("Retrieved check points [%s] for sequence [%s]", checkPoints, sequenceName); + } + catch (Exception e) { + log.makeAlert("Unable to get checkpoints from MetadataStore, not creating new tasks") + .addData("sequence_name", sequenceName) + .addData("exception", e.getMessage()) + .emit(); + return; + } + final Map context = new HashMap<>(); + if(spec.getContext() != null) { + context.putAll(spec.getContext()); + } + try { + context.put("check_points", sortingMapper.writerWithType(new TypeReference>() + { + }).writeValueAsString(checkPoints)); + } + catch (JsonProcessingException e) { + log.makeAlert("Unable to serialize check points, not creating new tasks") + .addData("sequence_name", sequenceName) + .addData("exception", e.getMessage()) + .emit(); + return; + } + for (int i = 0; i < replicas; i++) { String taskId = Joiner.on("_").join(sequenceName, getRandomId()); KafkaIndexTask indexTask = new KafkaIndexTask( @@ -1412,7 +1564,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, - spec.getContext(), + context, null ); @@ -1582,6 +1734,14 @@ private void killTask(final String id) } } + private void killTasksInGroup(TaskGroup taskGroup) { + if (taskGroup != null) { + for (Map.Entry entry : taskGroup.tasks.entrySet()) { + killTask(entry.getKey()); + } + } + } + private int getTaskGroupIdForPartition(int partition) { return partition % ioConfig.getTaskCount(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 396524005b4a..f1a74ec25ae8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -138,8 +138,8 @@ public void testNoTaskLocation() throws Exception Assert.assertEquals(null, client.getStartTime(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true)); Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of())); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), false, true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), true, true)); verifyAll(); } @@ -537,13 +537,13 @@ public void testSetEndOffsets() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets); + client.setEndOffsets(TEST_ID, endOffsets, false, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -562,13 +562,13 @@ public void testSetEndOffsetsAndResume() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, true); + client.setEndOffsets(TEST_ID, endOffsets, true, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -897,8 +897,8 @@ public void testSetEndOffsetsAsync() throws Exception List expectedUrls = Lists.newArrayList(); List> futures = Lists.newArrayList(); for (int i = 0; i < numRequests; i++) { - expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "offsets/end"))); - futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets)); + expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "offsets/end?resume=false&finish=true"))); + futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets, false, true)); } List responses = Futures.allAsList(futures).get(); @@ -937,11 +937,11 @@ public void testSetEndOffsetsAsyncWithResume() throws Exception TEST_HOST, TEST_PORT, TEST_IDS.get(i), - "offsets/end?resume=true" + "offsets/end?resume=true&finish=true" ) ) ); - futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets, true)); + futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets, true, true)); } List responses = Futures.allAsList(futures).get(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 2132d0623bd4..5eeef037be4e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -245,7 +245,7 @@ public void setUp() throws Exception zkServer.getConnectString(), tempFolder.newFolder(), 1, - ImmutableMap.of("num.partitions", "2") + ImmutableMap.of("num.partitions", "2", "advertised.host.name", "localhost") ); kafkaServer.start(); @@ -875,7 +875,7 @@ public void testRunOneTaskTwoPartitions() throws Exception // Check published segments & metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2011/P1D", 1); + SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); SegmentDescriptor desc4 = SD(task, "2012/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); Assert.assertEquals( @@ -889,7 +889,7 @@ public void testRunOneTaskTwoPartitions() throws Exception // Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically Assert.assertEquals( - ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")), + ImmutableSet.of(ImmutableList.of("d", "e", "h")), ImmutableSet.of(readSegmentDim1(desc2), readSegmentDim1(desc3)) ); } @@ -1170,7 +1170,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets()); Map newEndOffsets = ImmutableMap.of(0, 4L); - task.setEndOffsets(newEndOffsets, false); + task.setEndOffsets(newEndOffsets, false, true); Assert.assertEquals(newEndOffsets, task.getEndOffsets()); Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); task.resume(); @@ -1185,7 +1185,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception // try again but with resume flag == true newEndOffsets = ImmutableMap.of(0, 6L); - task.setEndOffsets(newEndOffsets, true); + task.setEndOffsets(newEndOffsets, true, true); Assert.assertEquals(newEndOffsets, task.getEndOffsets()); Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); @@ -1209,7 +1209,8 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); SegmentDescriptor desc2 = SD(task, "2010/P1D", 0); SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); + SegmentDescriptor desc4 = SD(task, "2011/P1D", 1); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); Assert.assertEquals( new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 6L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) @@ -1218,7 +1219,8 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception // Check segments in deep storage Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc1)); Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("d"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("e"), readSegmentDim1(desc4)); } @Test(timeout = 30_000L) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 4df4e38ec4a0..77bf0ef2c66b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -161,7 +161,7 @@ public void setUp() throws Exception zkServer.getConnectString(), tempFolder.newFolder(), 1, - ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS)) + ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS), "advertised.host.name", "localhost") ); kafkaServer.start(); kafkaHost = String.format("localhost:%d", kafkaServer.getPort()); @@ -213,6 +213,10 @@ public void testNoInitialState() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -259,6 +263,10 @@ public void testMultiTask() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(2); replayAll(); @@ -296,6 +304,10 @@ public void testReplicas() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(2); replayAll(); @@ -333,6 +345,10 @@ public void testLateMessageRejectionPeriod() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(2); replayAll(); @@ -375,6 +391,10 @@ public void testLatestOffset() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true); replayAll(); @@ -407,6 +427,10 @@ public void testDatasourceMetadata() throws Exception new KafkaPartitions(KAFKA_TOPIC, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)) ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true); replayAll(); @@ -435,6 +459,10 @@ public void testBadMetadataOffsets() throws Exception new KafkaPartitions(KAFKA_TOPIC, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)) ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + replayAll(); supervisor.start(); @@ -514,6 +542,10 @@ public void testKillIncompatibleTasks() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false)); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); @@ -630,6 +662,10 @@ public void testRequeueTaskWhenFailed() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -713,6 +749,10 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -775,6 +815,10 @@ public void testQueueNextTasksOnSuccess() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -843,6 +887,14 @@ public void testBeginPublishAndQueueNextTasks() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(indexerMetadataStorageCoordinator.addNewCheckPointForSequence( + EasyMock.anyString(), + EasyMock.anyObject(DataSourceMetadata.class) + )).andReturn(true).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -881,6 +933,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), + EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFuture(true)).times(2); @@ -940,6 +993,10 @@ public void testDiscoverExistingPublishingTask() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); expect(taskClient.getCurrentOffsetsAsync("id1", false)) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L))); @@ -1029,6 +1086,10 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); expect(taskClient.getCurrentOffsetsAsync("id1", false)) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 10L, 2, 30L))); @@ -1131,6 +1192,10 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); @@ -1191,6 +1256,10 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1236,6 +1305,10 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1303,6 +1376,14 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(indexerMetadataStorageCoordinator.addNewCheckPointForSequence( + EasyMock.anyString(), + EasyMock.anyObject(DataSourceMetadata.class) + )).andReturn(true).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(4); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1341,6 +1422,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), + EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); @@ -1438,6 +1520,14 @@ public void testStopGracefully() throws Exception null ) ).anyTimes(); + expect(indexerMetadataStorageCoordinator.getCheckPointsForSequence(EasyMock.anyString())).andReturn( + ImmutableList.of() + ).anyTimes(); + expect(indexerMetadataStorageCoordinator.addNewCheckPointForSequence( + EasyMock.anyString(), + EasyMock.anyObject(DataSourceMetadata.class) + )).andReturn(true).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); @@ -1456,7 +1546,7 @@ public void testStopGracefully() throws Exception expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); - expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3"); expectLastCall().times(2); diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java index d1f9b8cbdacd..c8c494010b8a 100644 --- a/extensions-core/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java +++ b/extensions-core/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java @@ -47,6 +47,7 @@ public void testIsTransientException() throws Exception null, null, null, + null, null ) ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java index 813f9c7948f8..6d0b8bcb9cf5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java @@ -97,6 +97,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig() null, null, null, + null, null ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java new file mode 100644 index 000000000000..aca4a1099aef --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -0,0 +1,106 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.DataSourceMetadata; + +import java.io.IOException; + +public class CheckPointDataSourceMetadataAction implements TaskAction +{ + private final String supervisorId; + private final String sequenceName; + private final DataSourceMetadata previousCheckPoint; + private final DataSourceMetadata currentCheckPoint; + + public CheckPointDataSourceMetadataAction( + @JsonProperty("supervisorId") String supervisorId, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, + @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint + ) + { + this.supervisorId = supervisorId; + this.sequenceName = sequenceName; + this.previousCheckPoint = previousCheckPoint; + this.currentCheckPoint = currentCheckPoint; + } + + @JsonProperty + public String getSupervisorId() + { + return supervisorId; + } + + @JsonProperty + public String getSequenceName() + { + return sequenceName; + } + + @JsonProperty + public DataSourceMetadata getPreviousCheckPoint() + { + return previousCheckPoint; + } + + @JsonProperty + public DataSourceMetadata getCurrentCheckPoint() + { + return currentCheckPoint; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + @Override + public Boolean perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + return toolbox.getSupervisorManager() + .checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint); + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "CheckPointDataSourceMetadataAction{" + + "supervisorId='" + supervisorId + '\'' + + ", sequenceName='" + sequenceName + '\'' + + ", previousCheckPoint=" + previousCheckPoint + + ", currentCheckPoint=" + currentCheckPoint + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index c625e68cc801..235f066ddadc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -39,7 +39,8 @@ @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class), @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class), - @JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class) + @JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class), + @JsonSubTypes.Type(name = "checkPointDataSourceMetadata", value = CheckPointDataSourceMetadataAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index e3fb7be6f9d7..9fb3ec1503b8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -162,4 +162,19 @@ private String defaultDir(String configParameter, final String defaultVal) return configParameter; } + + @Override + public String toString() + { + return "TaskConfig{" + + "baseDir='" + baseDir + '\'' + + ", baseTaskDir=" + baseTaskDir + + ", hadoopWorkingPath='" + hadoopWorkingPath + '\'' + + ", defaultRowFlushBoundary=" + defaultRowFlushBoundary + + ", defaultHadoopCoordinates=" + defaultHadoopCoordinates + + ", restoreTasksOnRestart=" + restoreTasksOnRestart + + ", gracefulShutdownTimeout=" + gracefulShutdownTimeout + + ", directoryLockTimeout=" + directoryLockTimeout + + '}'; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 75a86d8b84bb..4f0a26f2a116 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -483,6 +483,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) log.error("Failed to publish segments, aborting!"); return false; } else { + driver.waitForHandOff(); log.info( "Published segments[%s]", Joiner.on(", ").join( Iterables.transform( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 7f4670792153..86528cb47e57 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -24,7 +24,6 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; - import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.java.util.common.Pair; import io.druid.java.util.common.collect.JavaCompatUtils; @@ -158,6 +157,24 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc return true; } + public boolean checkPointDataSourceMetadata( + String supervisorId, + String sequenceName, + @Nullable DataSourceMetadata previousDataSourceMetadata, + @Nullable DataSourceMetadata currentDataSourceMetadata + ) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); + + Pair supervisor = supervisors.get(supervisorId); + + Preconditions.checkNotNull(supervisor, "supervisor could not be found"); + + supervisor.lhs.checkPoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + return true; + } + /** * Stops a supervisor with a given id and then removes it from the list. *

diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 03e08728490b..a1cc8f2aa4aa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -87,6 +87,20 @@ public List getUnusedSegmentsForInterval(String dataSource, Interva } } + @Override + public List getCheckPointsForSequence(String sequenceName) throws IOException + { + return ImmutableList.of(); + } + + @Override + public boolean addNewCheckPointForSequence( + String sequenceName, DataSourceMetadata dataSourceMetadata + ) throws IOException + { + return false; + } + @Override public Set announceHistoricalSegments(Set segments) { diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 4872110420e7..86e174c5758c 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -150,4 +150,8 @@ SegmentPublishResult announceHistoricalSegments( * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval */ List getUnusedSegmentsForInterval(String dataSource, Interval interval); + + List getCheckPointsForSequence(String sequenceName) throws IOException; + + boolean addNewCheckPointForSequence(String sequenceName, DataSourceMetadata dataSourceMetadata) throws IOException; } diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 508e5e9bffa2..a6a4f6ac8cc8 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -52,6 +52,12 @@ public SupervisorReport getStatus() @Override public void reset(DataSourceMetadata dataSourceMetadata) {} + + @Override + public void checkPoint(String sequenceName, DataSourceMetadata prev, DataSourceMetadata curr) + { + + } }; } diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index cec68c9674e2..507348469e9c 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,6 +21,8 @@ import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; + public interface Supervisor { void start(); @@ -36,4 +38,20 @@ public interface Supervisor SupervisorReport getStatus(); void reset(DataSourceMetadata dataSourceMetadata); + + /** + * The definition of checkpoint is not very strict as currently it does not affect data or control path + * On this call Supervisor can potentially checkpoint data processed so far to some durable storage + * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data + * represented by dataSourceMetadata + * + * @param sequenceName unique Identifier to figure out for which sequence to do check pointing + * @param previousCheckPoint DataSourceMetadata check pointed in previous call + * @param currentCheckPoint current DataSourceMetadata to be check pointed + */ + void checkPoint( + @Nullable String sequenceName, + @Nullable DataSourceMetadata previousCheckPoint, + @Nullable DataSourceMetadata currentCheckPoint + ); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 08431383e599..bdc4784168bf 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -19,6 +19,7 @@ package io.druid.metadata; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -108,6 +109,7 @@ public void start() connector.createDataSourceTable(); connector.createPendingSegmentsTable(); connector.createSegmentTable(); + connector.createTaskCheckPointsTable(); } @Override @@ -964,4 +966,79 @@ public List fold( log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); return matchingSegments; } + + @Override + public List getCheckPointsForSequence(String sequenceName) throws IOException + { + final byte[] payload = connector.lookup( + dbTables.getTaskCheckPointsTable(), + "sequence_name", + "payload", + sequenceName + ); + if (payload == null) { + return ImmutableList.of(); + } else { + return jsonMapper.readValue(payload, new TypeReference>() + { + }); + } + } + + @Override + public boolean addNewCheckPointForSequence( + final String sequenceName, DataSourceMetadata dataSourceMetadata + ) throws IOException + { + List existingCheckPoints = getCheckPointsForSequence(sequenceName); + final boolean createNew; + if (existingCheckPoints.size() == 0) { + createNew = true; + existingCheckPoints = new ArrayList<>(); + } else { + createNew = false; + if(existingCheckPoints.get(existingCheckPoints.size() - 1).equals(dataSourceMetadata)) { + // this can happen when the last check point for a sequence is equal to the end offsets for the task group + return true; + } + } + + final List checkPoints = new ArrayList<>(existingCheckPoints); + checkPoints.add(dataSourceMetadata); + + return connector.retryWithHandle(new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + if (createNew) { + return handle.createStatement( + String.format( + "INSERT INTO %s (sequence_name, payload) " + + "VALUES (:sequence_name, :payload)", + dbTables.getTaskCheckPointsTable() + ) + ) + .bind("sequence_name", sequenceName) + .bind("payload", jsonMapper.writerWithType(new TypeReference>() + { + }).writeValueAsBytes(checkPoints)) + .execute() == 1; + } else { + return handle.createStatement( + String.format( + "UPDATE %s SET payload = :payload " + + "WHERE sequence_name= :sequence_name", + dbTables.getTaskCheckPointsTable() + ) + ) + .bind("sequence_name", sequenceName) + .bind("payload", jsonMapper.writerWithType(new TypeReference>() + { + }).writeValueAsBytes(checkPoints)) + .execute() == 1; + } + } + }); + } } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java index a9beb514950f..92e96584a44e 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java @@ -393,6 +393,24 @@ tableName, getSerialType(), getPayloadType() ); } + public void createTaskCheckPointsTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " sequence_name VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (sequence_name)\n" + + ")", + tableName, getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_sequence_name ON %1$s(sequence_name)", tableName) + ) + ); + } + @Override public Void insertOrUpdate( final String tableName, @@ -503,6 +521,14 @@ public void createSupervisorsTable() } } + @Override + public void createTaskCheckPointsTable() + { + if (config.get().isCreateTables()) { + createTaskCheckPointsTable(tablesConfigSupplier.get().getTaskCheckPointsTable()); + } + } + @Override public byte[] lookup( final String tableName, diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 096e7dd07987..27326a4014ab 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -24,7 +24,6 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; - import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1c3e2794b94d..c7bb9d39051c 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -684,6 +684,7 @@ private void initializeExecutors() private void shutdownExecutors() { + log.info("Shutting down persist and push executor"); persistExecutor.shutdownNow(); pushExecutor.shutdownNow(); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index 8119f81f06ae..129287c67eca 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; - import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; @@ -91,6 +90,11 @@ public class FiniteAppenderatorDriver implements Closeable // Notified when segments are dropped. private final Object handoffMonitor = new Object(); + // SegmentIdentifier -> Number of Summarized rows in the segment + // Used by Kafka Indexing Task to create new appenderator driver when limit is reached + private final Map rowsInSegment = Maps.newHashMap(); + + /** * Create a driver. * @@ -193,6 +197,16 @@ public SegmentIdentifier add( final String sequenceName, final Supplier committerSupplier ) throws IOException + { + return add(row, sequenceName, committerSupplier, true); + } + + public SegmentIdentifier add( + final InputRow row, + final String sequenceName, + final Supplier committerSupplier, + final boolean moveOutSegmentOnMaxRowsInSegmentLimit + ) throws IOException { Preconditions.checkNotNull(row, "row"); Preconditions.checkNotNull(sequenceName, "sequenceName"); @@ -204,8 +218,11 @@ public SegmentIdentifier add( try { final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier)); if (numRows >= maxRowsPerSegment) { - moveSegmentOut(sequenceName, ImmutableList.of(identifier)); + if(moveOutSegmentOnMaxRowsInSegmentLimit) { + moveSegmentOut(sequenceName, ImmutableList.of(identifier)); + } } + rowsInSegment.put(identifier.toString(), numRows); } catch (SegmentNotWritableException e) { throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); @@ -215,6 +232,14 @@ public SegmentIdentifier add( return identifier; } + public Integer numRowsInSegment(SegmentIdentifier identifier) { + return rowsInSegment.get(identifier.toString()); + } + + public Integer getMaxRowPerSegment() { + return maxRowsPerSegment; + } + /** * Persist all data indexed through this driver so far. Blocks until complete. * @@ -259,43 +284,37 @@ public SegmentsAndMetadata finish( final Committer committer ) throws InterruptedException { - final SegmentsAndMetadata segmentsAndMetadata = publishAll(publisher, wrapCommitter(committer)); + return publishAll(publisher, wrapCommitter(committer)); + } - if (segmentsAndMetadata != null) { - final long giveUpAt = handoffConditionTimeout > 0 - ? System.currentTimeMillis() + handoffConditionTimeout - : 0; + public void waitForHandOff() throws InterruptedException + { + final long giveUpAt = handoffConditionTimeout > 0 + ? System.currentTimeMillis() + handoffConditionTimeout + : 0; - log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments())); + log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments())); - synchronized (handoffMonitor) { - while (!appenderator.getSegments().isEmpty()) { + synchronized (handoffMonitor) { + while (!appenderator.getSegments().isEmpty()) { - if (giveUpAt == 0) { - handoffMonitor.wait(); + if (giveUpAt == 0) { + handoffMonitor.wait(); + } else { + final long remaining = giveUpAt - System.currentTimeMillis(); + if (remaining > 0) { + handoffMonitor.wait(remaining); } else { - final long remaining = giveUpAt - System.currentTimeMillis(); - if (remaining > 0) { - handoffMonitor.wait(remaining); - } else { - throw new ISE( - "Segment handoff wait timeout. Segments not yet handed off: [%s]", - Joiner.on(", ").join(appenderator.getSegments()) - ); - } + throw new ISE( + "Segment handoff wait timeout. Segments not yet handed off: [%s]", + Joiner.on(", ").join(appenderator.getSegments()) + ); } } } - - log.info("All segments handed off."); - - return new SegmentsAndMetadata( - segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - } else { - return null; } + + log.info("All segments handed off."); } /** @@ -307,6 +326,10 @@ public void close() handoffNotifier.close(); } + public Appenderator getAppenderator() { + return appenderator; + } + private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) { synchronized (activeSegments) { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index acf98ef38846..a0bb43647c22 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -140,7 +140,7 @@ public void testSimple() throws Exception asIdentifiers(segmentsAndMetadata.getSegments()) ); - Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata()); + Assert.assertEquals(3, ((FiniteAppenderatorDriverMetadata)segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()); } @Test @@ -167,7 +167,7 @@ public void testMaxRowsPerSegment() throws Exception final SegmentsAndMetadata segmentsAndMetadata = driver.finish(makeOkPublisher(), committerSupplier.get()); Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size()); - Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata()); + Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, ((FiniteAppenderatorDriverMetadata)segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()); } private Set asIdentifiers(Iterable segments)