diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
index c995c2ce8da1..e53553c1e9d6 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
@@ -21,7 +21,7 @@
*/
public interface MetadataStorageConnector
{
- public Void insertOrUpdate(
+ Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
@@ -30,20 +30,22 @@ public Void insertOrUpdate(
) throws Exception;
- public byte[] lookup(
+ byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
);
- public void createSegmentTable();
+ void createPendingSegmentsTable();
- public void createRulesTable();
+ void createSegmentTable();
- public void createConfigTable();
+ void createRulesTable();
- public void createTaskTables();
+ void createConfigTable();
- public void createAuditTable();
+ void createTaskTables();
+
+ void createAuditTable();
}
diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
index abf32b1b5b02..383a5847fed6 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
@@ -29,7 +29,7 @@ public class MetadataStorageTablesConfig
{
public static MetadataStorageTablesConfig fromBase(String base)
{
- return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null);
+ return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null);
}
public static final String TASK_ENTRY_TYPE = "task";
@@ -43,6 +43,9 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonProperty("base")
private final String base;
+ @JsonProperty("pendingSegments")
+ private final String pendingSegmentsTable;
+
@JsonProperty("segments")
private final String segmentsTable;
@@ -67,6 +70,7 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonCreator
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
+ @JsonProperty("pendingSegments") String pendingSegmentsTable,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
@JsonProperty("config") String configTable,
@@ -77,6 +81,7 @@ public MetadataStorageTablesConfig(
)
{
this.base = (base == null) ? DEFAULT_BASE : base;
+ this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");
@@ -108,6 +113,11 @@ public String getBase()
return base;
}
+ public String getPendingSegmentsTable()
+ {
+ return pendingSegmentsTable;
+ }
+
public String getSegmentsTable()
{
return segmentsTable;
diff --git a/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java b/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java
index 43f1e34e9f61..be854c90febc 100644
--- a/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java
+++ b/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java
@@ -35,7 +35,7 @@ public void testIsTransientException() throws Exception
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
- Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null))
+ Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null))
);
Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE")));
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java
index 4595c1affbda..e07c3c84c567 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java
@@ -92,6 +92,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
null,
null,
null,
+ null,
null
);
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java
new file mode 100644
index 000000000000..8cb7dcc0f2df
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.api.client.repackaged.com.google.common.base.Preconditions;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.Longs;
+import com.metamx.common.Granularity;
+import com.metamx.common.IAE;
+import com.metamx.common.logger.Logger;
+import io.druid.granularity.QueryGranularity;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.task.Task;
+import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Allocates a pending segment for a given timestamp. The preferredSegmentGranularity is used if there are no prior
+ * segments for the given timestamp, or if the prior segments for the given timestamp are already at the
+ * preferredSegmentGranularity. Otherwise, the prior segments will take precedence.
+ *
+ * This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand,
+ * although you *do* have to release them yourself.
+ *
+ * If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it will return
+ * a missing Optional.
+ */
+public class SegmentAllocateAction implements TaskAction>
+{
+ private static final Logger log = new Logger(SegmentAllocateAction.class);
+
+ // Prevent spinning forever in situations where the segment list just won't stop changing.
+ private static final int MAX_ATTEMPTS = 90;
+
+ private final String dataSource;
+ private final DateTime timestamp;
+ private final QueryGranularity queryGranularity;
+ private final Granularity preferredSegmentGranularity;
+ private final String sequenceName;
+ private final String previousSegmentId;
+
+ public static List granularitiesFinerThan(final Granularity gran0)
+ {
+ final DateTime epoch = new DateTime(0);
+ final List retVal = Lists.newArrayList();
+ for (Granularity gran : Granularity.values()) {
+ if (gran.bucket(epoch).toDurationMillis() <= gran0.bucket(epoch).toDurationMillis()) {
+ retVal.add(gran);
+ }
+ }
+ Collections.sort(
+ retVal,
+ new Comparator()
+ {
+ @Override
+ public int compare(Granularity g1, Granularity g2)
+ {
+ return Longs.compare(g2.bucket(epoch).toDurationMillis(), g1.bucket(epoch).toDurationMillis());
+ }
+ }
+ );
+ return retVal;
+ }
+
+ public SegmentAllocateAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("timestamp") DateTime timestamp,
+ @JsonProperty("queryGranularity") QueryGranularity queryGranularity,
+ @JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
+ @JsonProperty("sequenceName") String sequenceName,
+ @JsonProperty("previousSegmentId") String previousSegmentId
+ )
+ {
+ this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+ this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+ this.queryGranularity = Preconditions.checkNotNull(queryGranularity, "queryGranularity");
+ this.preferredSegmentGranularity = Preconditions.checkNotNull(
+ preferredSegmentGranularity,
+ "preferredSegmentGranularity"
+ );
+ this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
+ this.previousSegmentId = previousSegmentId;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public DateTime getTimestamp()
+ {
+ return timestamp;
+ }
+
+ @JsonProperty
+ public QueryGranularity getQueryGranularity()
+ {
+ return queryGranularity;
+ }
+
+ @JsonProperty
+ public Granularity getPreferredSegmentGranularity()
+ {
+ return preferredSegmentGranularity;
+ }
+
+ @JsonProperty
+ public String getSequenceName()
+ {
+ return sequenceName;
+ }
+
+ @JsonProperty
+ public String getPreviousSegmentId()
+ {
+ return previousSegmentId;
+ }
+
+ @Override
+ public TypeReference> getReturnTypeReference()
+ {
+ return new TypeReference>()
+ {
+ };
+ }
+
+ @Override
+ public Optional perform(
+ final Task task,
+ final TaskActionToolbox toolbox
+ ) throws IOException
+ {
+ int attempt = 0;
+ while (true) {
+ attempt++;
+
+ if (!task.getDataSource().equals(dataSource)) {
+ throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource);
+ }
+
+ final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator();
+
+ // 1) if something overlaps our timestamp, use that
+ // 2) otherwise try preferredSegmentGranularity & going progressively smaller
+
+ final List tryIntervals = Lists.newArrayList();
+
+ final Interval rowInterval = new Interval(
+ queryGranularity.truncate(timestamp.getMillis()),
+ queryGranularity.next(queryGranularity.truncate(timestamp.getMillis()))
+ );
+
+ final Set usedSegmentsForRow = ImmutableSet.copyOf(
+ msc.getUsedSegmentsForInterval(dataSource, rowInterval)
+ );
+
+ if (usedSegmentsForRow.isEmpty()) {
+ // No existing segments for this row, but there might still be nearby ones that conflict with our preferred
+ // segment granularity. Try that first, and then progressively smaller ones if it fails.
+ for (Granularity gran : granularitiesFinerThan(preferredSegmentGranularity)) {
+ tryIntervals.add(gran.bucket(timestamp));
+ }
+ } else {
+ // Existing segment(s) exist for this row; use the interval of the first one.
+ tryIntervals.add(usedSegmentsForRow.iterator().next().getInterval());
+ }
+
+ for (final Interval tryInterval : tryIntervals) {
+ if (tryInterval.contains(rowInterval)) {
+ log.debug(
+ "Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].",
+ rowInterval,
+ tryInterval
+ );
+ final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull();
+ if (tryLock != null) {
+ final SegmentIdentifier identifier = msc.allocatePendingSegment(
+ dataSource,
+ sequenceName,
+ previousSegmentId,
+ tryInterval,
+ tryLock.getVersion()
+ );
+ if (identifier != null) {
+ return Optional.of(identifier);
+ } else {
+ log.debug(
+ "Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
+ rowInterval,
+ tryInterval
+ );
+ }
+ } else {
+ log.debug("Could not acquire lock for rowInterval[%s], segmentInterval[%s].", rowInterval, tryInterval);
+ }
+ }
+ }
+
+ // Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment
+ // overlapping with this row between when we called "mdc.getUsedSegmentsForInterval" and now. Check it again,
+ // and if it's different, repeat.
+
+ if (!ImmutableSet.copyOf(msc.getUsedSegmentsForInterval(dataSource, rowInterval)).equals(usedSegmentsForRow)) {
+ if (attempt < MAX_ATTEMPTS) {
+ final long shortRandomSleep = 50 + (long) (Math.random() * 450);
+ log.debug(
+ "Used segment set changed for rowInterval[%s]. Retrying segment allocation in %,dms (attempt = %,d).",
+ rowInterval,
+ shortRandomSleep,
+ attempt
+ );
+ try {
+ Thread.sleep(shortRandomSleep);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ } else {
+ log.error(
+ "Used segment set changed for rowInterval[%s]. Not trying again (attempt = %,d).",
+ rowInterval,
+ attempt
+ );
+ return Optional.absent();
+ }
+ } else {
+ return Optional.absent();
+ }
+ }
+ }
+
+ @Override
+ public boolean isAudited()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentAllocateAction{" +
+ "dataSource='" + dataSource + '\'' +
+ ", timestamp=" + timestamp +
+ ", queryGranularity=" + queryGranularity +
+ ", preferredSegmentGranularity=" + preferredSegmentGranularity +
+ ", sequenceName='" + sequenceName + '\'' +
+ ", previousSegmentId='" + previousSegmentId + '\'' +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
index 5c751e8e0917..1c0cb386f91e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
@@ -34,7 +34,8 @@
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
- @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class)
+ @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
+ @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
})
public interface TaskAction
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
index ae0ac73c6e77..e85dffdc8994 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
@@ -65,24 +65,6 @@ public ServiceEmitter getEmitter()
return emitter;
}
- public boolean segmentsAreFromSamePartitionSet(
- final Set segments
- )
- {
- // Verify that these segments are all in the same partition set
-
- Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
- final DataSegment firstSegment = segments.iterator().next();
- for (final DataSegment segment : segments) {
- if (!segment.getDataSource().equals(firstSegment.getDataSource())
- || !segment.getInterval().equals(firstSegment.getInterval())
- || !segment.getVersion().equals(firstSegment.getVersion())) {
- return false;
- }
- }
- return true;
- }
-
public void verifyTaskLocks(
final Task task,
final Set segments
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java
new file mode 100644
index 000000000000..89c4b3cd3ce4
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -0,0 +1,738 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.metamx.common.Granularity;
+import com.metamx.common.ISE;
+import io.druid.granularity.DurationGranularity;
+import io.druid.granularity.QueryGranularity;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.task.NoopTask;
+import io.druid.indexing.common.task.Task;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import io.druid.timeline.partition.NumberedShardSpec;
+import io.druid.timeline.partition.SingleDimensionShardSpec;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class SegmentAllocateActionTest
+{
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Rule
+ public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
+ private static final String DATA_SOURCE = "none";
+ private static final DateTime PARTY_TIME = new DateTime("1999");
+ private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000");
+
+ @Test
+ public void testGranularitiesFinerThanDay() throws Exception
+ {
+ Assert.assertEquals(
+ ImmutableList.of(
+ Granularity.DAY,
+ Granularity.SIX_HOUR,
+ Granularity.HOUR,
+ Granularity.FIFTEEN_MINUTE,
+ Granularity.TEN_MINUTE,
+ Granularity.FIVE_MINUTE,
+ Granularity.MINUTE,
+ Granularity.SECOND
+ ),
+ SegmentAllocateAction.granularitiesFinerThan(Granularity.DAY)
+ );
+ }
+
+ @Test
+ public void testGranularitiesFinerThanHour() throws Exception
+ {
+ Assert.assertEquals(
+ ImmutableList.of(
+ Granularity.HOUR,
+ Granularity.FIFTEEN_MINUTE,
+ Granularity.TEN_MINUTE,
+ Granularity.FIVE_MINUTE,
+ Granularity.MINUTE,
+ Granularity.SECOND
+ ),
+ SegmentAllocateAction.granularitiesFinerThan(Granularity.HOUR)
+ );
+ }
+
+ @Test
+ public void testManySegmentsSameInterval() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ null
+ );
+ final SegmentIdentifier id2 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id3 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id2.getIdentifierAsString()
+ );
+
+ final TaskLock partyLock = Iterables.getOnlyElement(
+ FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskLock input)
+ {
+ return input.getInterval().contains(PARTY_TIME);
+ }
+ }
+ )
+ );
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(0, 0)
+ )
+ );
+ assertSameIdentifier(
+ id2,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(1, 0)
+ )
+ );
+ assertSameIdentifier(
+ id3,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(2, 0)
+ )
+ );
+ }
+
+ @Test
+ public void testResumeSequence() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ null
+ );
+ final SegmentIdentifier id2 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id3 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id2.getIdentifierAsString()
+ );
+ final SegmentIdentifier id4 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id5 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id6 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.MINUTE,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id7 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.DAY,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+
+ final TaskLock partyLock = Iterables.getOnlyElement(
+ FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskLock input)
+ {
+ return input.getInterval().contains(PARTY_TIME);
+ }
+ }
+ )
+ );
+ final TaskLock futureLock = Iterables.getOnlyElement(
+ FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskLock input)
+ {
+ return input.getInterval().contains(THE_DISTANT_FUTURE);
+ }
+ }
+ )
+ );
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(0, 0)
+ )
+ );
+ assertSameIdentifier(
+ id2,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
+ futureLock.getVersion(),
+ new NumberedShardSpec(0, 0)
+ )
+ );
+ assertSameIdentifier(
+ id3,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(1, 0)
+ )
+ );
+ Assert.assertNull(id4);
+ assertSameIdentifier(id5, id2);
+ Assert.assertNull(id6);
+ assertSameIdentifier(id7, id2);
+ }
+
+ @Test
+ public void testMultipleSequences() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
+ final SegmentIdentifier id2 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s2", null);
+ final SegmentIdentifier id3 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+ final SegmentIdentifier id4 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id3.getIdentifierAsString()
+ );
+ final SegmentIdentifier id5 = allocate(
+ task,
+ THE_DISTANT_FUTURE,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s2",
+ id2.getIdentifierAsString()
+ );
+ final SegmentIdentifier id6 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
+
+ final TaskLock partyLock = Iterables.getOnlyElement(
+ FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskLock input)
+ {
+ return input.getInterval().contains(PARTY_TIME);
+ }
+ }
+ )
+ );
+ final TaskLock futureLock = Iterables.getOnlyElement(
+ FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskLock input)
+ {
+ return input.getInterval().contains(THE_DISTANT_FUTURE);
+ }
+ }
+ )
+ );
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(0, 0)
+ )
+ );
+ assertSameIdentifier(
+ id2,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(1, 0)
+ )
+ );
+ assertSameIdentifier(
+ id3,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ partyLock.getVersion(),
+ new NumberedShardSpec(2, 0)
+ )
+ );
+ assertSameIdentifier(
+ id4,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
+ futureLock.getVersion(),
+ new NumberedShardSpec(0, 0)
+ )
+ );
+ assertSameIdentifier(
+ id5,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
+ futureLock.getVersion(),
+ new NumberedShardSpec(1, 0)
+ )
+ );
+ assertSameIdentifier(
+ id6,
+ id1
+ );
+ }
+
+ @Test
+ public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new LinearShardSpec(0))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new LinearShardSpec(1))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ null
+ );
+ final SegmentIdentifier id2 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new LinearShardSpec(2)
+ )
+ );
+ assertSameIdentifier(
+ id2,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new LinearShardSpec(3)
+ )
+ );
+ }
+
+ @Test
+ public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(0, 2))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(1, 2))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ null
+ );
+ final SegmentIdentifier id2 = allocate(
+ task,
+ PARTY_TIME,
+ QueryGranularity.NONE,
+ Granularity.HOUR,
+ "s1",
+ id1.getIdentifierAsString()
+ );
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new NumberedShardSpec(2, 2)
+ )
+ );
+ assertSameIdentifier(
+ id2,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new NumberedShardSpec(3, 2)
+ )
+ );
+ }
+
+ @Test
+ public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(0, 2))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(1, 2))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.DAY, "s1", null);
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new NumberedShardSpec(2, 2)
+ )
+ );
+ }
+
+ @Test
+ public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(0, 2))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(1, 2))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.MINUTE, "s1", null);
+
+ assertSameIdentifier(
+ id1,
+ new SegmentIdentifier(
+ DATA_SOURCE,
+ Granularity.HOUR.bucket(PARTY_TIME),
+ PARTY_TIME.toString(),
+ new NumberedShardSpec(2, 2)
+ )
+ );
+ }
+
+ @Test
+ public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(0, 2))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new NumberedShardSpec(1, 2))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.DAY, "s1", null);
+
+ Assert.assertNull(id1);
+ }
+
+ @Test
+ public void testCannotDoAnythingWithSillyQueryGranularity() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.HOUR, "s1", null);
+
+ Assert.assertNull(id1);
+ }
+
+ @Test
+ public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception
+ {
+ final Task task = new NoopTask(null, 0, 0, null, null, null);
+
+ taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
+ ImmutableSet.of(
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0))
+ .build(),
+ DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularity.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.toString())
+ .shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1))
+ .build()
+ )
+ );
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
+
+ Assert.assertNull(id1);
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final SegmentAllocateAction action = new SegmentAllocateAction(
+ DATA_SOURCE,
+ PARTY_TIME,
+ QueryGranularity.MINUTE,
+ Granularity.HOUR,
+ "s1",
+ "prev"
+ );
+
+ final ObjectMapper objectMapper = new DefaultObjectMapper();
+ final SegmentAllocateAction action2 = (SegmentAllocateAction) objectMapper.readValue(
+ objectMapper.writeValueAsBytes(action),
+ TaskAction.class
+ );
+
+ Assert.assertEquals(DATA_SOURCE, action2.getDataSource());
+ Assert.assertEquals(PARTY_TIME, action2.getTimestamp());
+ Assert.assertEquals(new DurationGranularity(60000, 0), action2.getQueryGranularity());
+ Assert.assertSame(Granularity.HOUR, action2.getPreferredSegmentGranularity());
+ Assert.assertEquals("s1", action2.getSequenceName());
+ Assert.assertEquals("prev", action2.getPreviousSegmentId());
+ }
+
+ private SegmentIdentifier allocate(
+ final Task task,
+ final DateTime timestamp,
+ final QueryGranularity queryGranularity,
+ final Granularity preferredSegmentGranularity,
+ final String sequenceName,
+ final String sequencePreviousId
+ ) throws Exception
+ {
+ final SegmentAllocateAction action = new SegmentAllocateAction(
+ DATA_SOURCE,
+ timestamp,
+ queryGranularity,
+ preferredSegmentGranularity,
+ sequenceName,
+ sequencePreviousId
+ );
+ return action.perform(task, taskActionTestKit.getTaskActionToolbox()).orNull();
+ }
+
+ private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other)
+ {
+ Assert.assertEquals(one, other);
+ Assert.assertEquals(one.getShardSpec().getPartitionNum(), other.getShardSpec().getPartitionNum());
+
+ if (one.getShardSpec().getClass() == NumberedShardSpec.class
+ && other.getShardSpec().getClass() == NumberedShardSpec.class) {
+ Assert.assertEquals(
+ ((NumberedShardSpec) one.getShardSpec()).getPartitions(),
+ ((NumberedShardSpec) other.getShardSpec()).getPartitions()
+ );
+ } else if (one.getShardSpec().getClass() == LinearShardSpec.class
+ && other.getShardSpec().getClass() == LinearShardSpec.class) {
+ // do nothing
+ } else {
+ throw new ISE(
+ "Unexpected shardSpecs [%s] and [%s]",
+ one.getShardSpec().getClass(),
+ other.getShardSpec().getClass()
+ );
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java
new file mode 100644
index 000000000000..a4d86fd2575e
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.google.common.base.Suppliers;
+import io.druid.indexing.common.TestUtils;
+import io.druid.indexing.common.config.TaskStorageConfig;
+import io.druid.indexing.overlord.HeapMemoryTaskStorage;
+import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskStorage;
+import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.TestDerbyConnector;
+import io.druid.server.metrics.NoopServiceEmitter;
+import org.joda.time.Period;
+import org.junit.rules.ExternalResource;
+
+public class TaskActionTestKit extends ExternalResource
+{
+ private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid");
+
+ private TaskStorage taskStorage;
+ private TaskLockbox taskLockbox;
+ private TestDerbyConnector testDerbyConnector;
+ private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
+ private TaskActionToolbox taskActionToolbox;
+
+ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
+ {
+ return metadataStorageTablesConfig;
+ }
+
+ public TaskStorage getTaskStorage()
+ {
+ return taskStorage;
+ }
+
+ public TaskLockbox getTaskLockbox()
+ {
+ return taskLockbox;
+ }
+
+ public TestDerbyConnector getTestDerbyConnector()
+ {
+ return testDerbyConnector;
+ }
+
+ public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
+ {
+ return metadataStorageCoordinator;
+ }
+
+ public TaskActionToolbox getTaskActionToolbox()
+ {
+ return taskActionToolbox;
+ }
+
+ @Override
+ public void before()
+ {
+ taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
+ taskLockbox = new TaskLockbox(taskStorage);
+ testDerbyConnector = new TestDerbyConnector(
+ Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
+ Suppliers.ofInstance(metadataStorageTablesConfig)
+ );
+ metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
+ new TestUtils().getTestObjectMapper(),
+ metadataStorageTablesConfig,
+ testDerbyConnector
+ );
+ taskActionToolbox = new TaskActionToolbox(
+ taskLockbox,
+ metadataStorageCoordinator,
+ new NoopServiceEmitter()
+ );
+ testDerbyConnector.createPendingSegmentsTable();
+ testDerbyConnector.createSegmentTable();
+ testDerbyConnector.createRulesTable();
+ testDerbyConnector.createConfigTable();
+ testDerbyConnector.createTaskTables();
+ testDerbyConnector.createAuditTable();
+ }
+
+ @Override
+ public void after()
+ {
+ testDerbyConnector.tearDown();
+ taskStorage = null;
+ taskLockbox = null;
+ testDerbyConnector = null;
+ metadataStorageCoordinator = null;
+ taskActionToolbox = null;
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index bca630e29576..6f809b71a8a3 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -17,23 +17,28 @@
package io.druid.indexing.overlord;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
-import junit.framework.Assert;
import org.joda.time.Interval;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.List;
+
public class TaskLockboxTest
{
- private TaskStorage taskStorage;
+ private TaskStorage taskStorage;
- private TaskLockbox lockbox;
+ private TaskLockbox lockbox;
@Before
- public void setUp(){
+ public void setUp()
+ {
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage);
}
@@ -49,7 +54,7 @@ public void testLock() throws InterruptedException
@Test(expected = IllegalStateException.class)
public void testLockForInactiveTask() throws InterruptedException
{
- lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02"));
+ lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02"));
}
@Test(expected = IllegalStateException.class)
@@ -78,7 +83,30 @@ public void testTryLock() throws InterruptedException
// Now task2 should be able to get the lock
Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
+ }
+ @Test
+ public void testTrySmallerLock() throws InterruptedException
+ {
+ Task task = NoopTask.create();
+ lockbox.add(task);
+ Optional lock1 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03"));
+ Assert.assertTrue(lock1.isPresent());
+ Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval());
+
+ // same task tries to take partially overlapping interval; should fail
+ Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")).isPresent());
+
+ // same task tries to take contained interval; should succeed and should match the original lock
+ Optional lock2 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02"));
+ Assert.assertTrue(lock2.isPresent());
+ Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock2.get().getInterval());
+
+ // only the first lock should actually exist
+ Assert.assertEquals(
+ ImmutableList.of(lock1.get()),
+ lockbox.findLocksForTask(task)
+ );
}
@Test(expected = IllegalStateException.class)
@@ -93,8 +121,8 @@ public void testTryLockAfterTaskComplete() throws InterruptedException
Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
- Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); }
-
+ Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent());
+ }
}
diff --git a/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java b/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java
index 217764dad2fe..9eb04aa01cb6 100644
--- a/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java
+++ b/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java
@@ -198,7 +198,7 @@ public void configure(Binder binder)
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding(type)
.to(IndexerSQLMetadataStorageCoordinator.class)
- .in(LazySingleton.class);
+ .in(ManageLifecycle.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding(type)
diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 4a7122d1d8f3..b7d438c41f8d 100644
--- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -17,6 +17,7 @@
package io.druid.indexing.overlord;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@@ -51,6 +52,31 @@ public List getUsedSegmentsForInterval(final String dataSource, fin
*/
public Set announceHistoricalSegments(final Set segments) throws IOException;
+ /**
+ * Allocate a new pending segment in the pending segments table. This segment identifier will never be given out
+ * again, unless another call is made with the same dataSource, sequenceName, and previousSegmentId.
+ *
+ * The sequenceName and previousSegmentId parameters are meant to make it easy for two independent ingestion tasks
+ * to produce the same series of segments.
+ *
+ * Note that a segment sequence may include segments with a variety of different intervals and versions.
+ *
+ * @param dataSource dataSource for which to allocate a segment
+ * @param sequenceName name of the group of ingestion tasks producing a segment series
+ * @param previousSegmentId previous segment in the series; may be null or empty, meaning this is the first segment
+ * @param interval interval for which to allocate a segment
+ * @param maxVersion use this version if we have no better version to use. The returned segment identifier may
+ * have a version lower than this one, but will not have one higher.
+ *
+ * @return the pending segment identifier, or null if it was impossible to allocate a new segment
+ */
+ SegmentIdentifier allocatePendingSegment(
+ String dataSource,
+ String sequenceName,
+ String previousSegmentId,
+ Interval interval,
+ String maxVersion
+ ) throws IOException;
public void updateSegmentMetadata(final Set segments) throws IOException;
diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 9d3ba1f28566..dbb49a624069 100644
--- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -26,12 +27,17 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
+import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
+import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
+import io.druid.timeline.partition.NumberedShardSpec;
+import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.FoldController;
@@ -72,67 +78,124 @@ public IndexerSQLMetadataStorageCoordinator(
this.connector = connector;
}
- public List getUsedSegmentsForInterval(final String dataSource, final Interval interval)
- throws IOException
+ @LifecycleStart
+ public void start()
{
- final VersionedIntervalTimeline timeline = connector.getDBI().withHandle(
- new HandleCallback>()
+ connector.createPendingSegmentsTable();
+ connector.createSegmentTable();
+ }
+
+ public List getUsedSegmentsForInterval(
+ final String dataSource,
+ final Interval interval
+ ) throws IOException
+ {
+ return connector.retryWithHandle(
+ new HandleCallback>()
{
@Override
- public VersionedIntervalTimeline withHandle(Handle handle) throws IOException
+ public List withHandle(Handle handle) throws Exception
{
- final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline(
- Ordering.natural()
+ final VersionedIntervalTimeline timeline = getTimelineForIntervalWithHandle(
+ handle,
+ dataSource,
+ interval
);
- final ResultIterator dbSegments =
- handle.createQuery(
- String.format(
- "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true",
- dbTables.getSegmentsTable()
+ return Lists.newArrayList(
+ Iterables.concat(
+ Iterables.transform(
+ timeline.lookup(interval),
+ new Function, Iterable>()
+ {
+ @Override
+ public Iterable apply(TimelineObjectHolder input)
+ {
+ return input.getObject().payloads();
+ }
+ }
)
)
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(ByteArrayMapper.FIRST)
- .iterator();
-
- while (dbSegments.hasNext()) {
- final byte[] payload = dbSegments.next();
-
- DataSegment segment = jsonMapper.readValue(
- payload,
- DataSegment.class
- );
+ );
- timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
+ }
+ }
+ );
+ }
- }
+ private List getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval
+ ) throws IOException
+ {
+ final List identifiers = Lists.newArrayList();
- dbSegments.close();
+ final ResultIterator dbSegments =
+ handle.createQuery(
+ String.format(
+ "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start",
+ dbTables.getPendingSegmentsTable()
+ )
+ )
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .map(ByteArrayMapper.FIRST)
+ .iterator();
+
+ while (dbSegments.hasNext()) {
+ final byte[] payload = dbSegments.next();
+ final SegmentIdentifier identifier = jsonMapper.readValue(payload, SegmentIdentifier.class);
+
+ if (interval.overlaps(identifier.getInterval())) {
+ identifiers.add(identifier);
+ }
+ }
- return timeline;
+ dbSegments.close();
- }
- }
+ return identifiers;
+ }
+
+ private VersionedIntervalTimeline getTimelineForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval
+ ) throws IOException
+ {
+ final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(
+ Ordering.natural()
);
- return Lists.newArrayList(
- Iterables.concat(
- Iterables.transform(
- timeline.lookup(interval),
- new Function, Iterable>()
- {
- @Override
- public Iterable apply(TimelineObjectHolder input)
- {
- return input.getObject().payloads();
- }
- }
+ final ResultIterator dbSegments =
+ handle.createQuery(
+ String.format(
+ "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true",
+ dbTables.getSegmentsTable()
)
)
- );
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .map(ByteArrayMapper.FIRST)
+ .iterator();
+
+ while (dbSegments.hasNext()) {
+ final byte[] payload = dbSegments.next();
+
+ DataSegment segment = jsonMapper.readValue(
+ payload,
+ DataSegment.class
+ );
+
+ timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
+
+ }
+
+ dbSegments.close();
+
+ return timeline;
}
/**
@@ -140,6 +203,7 @@ public Iterable apply(TimelineObjectHolder inp
* with identifiers already in the database will not be added).
*
* @param segments set of segments to add
+ *
* @return set of segments actually added
*/
public Set announceHistoricalSegments(final Set segments) throws IOException
@@ -164,6 +228,204 @@ public Set inTransaction(Handle handle, TransactionStatus transacti
);
}
+ @Override
+ public SegmentIdentifier allocatePendingSegment(
+ final String dataSource,
+ final String sequenceName,
+ final String previousSegmentId,
+ final Interval interval,
+ final String maxVersion
+ ) throws IOException
+ {
+ Preconditions.checkNotNull(dataSource, "dataSource");
+ Preconditions.checkNotNull(sequenceName, "sequenceName");
+ Preconditions.checkNotNull(interval, "interval");
+ Preconditions.checkNotNull(maxVersion, "maxVersion");
+
+ final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
+
+ return connector.retryTransaction(
+ new TransactionCallback()
+ {
+ @Override
+ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
+ {
+ final List existingBytes = handle
+ .createQuery(
+ String.format(
+ "SELECT payload FROM %s WHERE "
+ + "dataSource = :dataSource AND "
+ + "sequence_name = :sequence_name AND "
+ + "sequence_prev_id = :sequence_prev_id",
+ dbTables.getPendingSegmentsTable()
+ )
+ ).bind("dataSource", dataSource)
+ .bind("sequence_name", sequenceName)
+ .bind("sequence_prev_id", previousSegmentIdNotNull)
+ .map(ByteArrayMapper.FIRST)
+ .list();
+
+ if (!existingBytes.isEmpty()) {
+ final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
+ Iterables.getOnlyElement(existingBytes),
+ SegmentIdentifier.class
+ );
+
+ if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
+ && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
+ log.info(
+ "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
+ existingIdentifier.getIdentifierAsString(),
+ sequenceName,
+ previousSegmentIdNotNull
+ );
+
+ return existingIdentifier;
+ } else {
+ log.warn(
+ "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ + "does not match requested interval[%s]",
+ existingIdentifier.getIdentifierAsString(),
+ sequenceName,
+ previousSegmentIdNotNull,
+ interval
+ );
+
+ return null;
+ }
+ }
+
+ // Make up a pending segment based on existing segments and pending segments in the DB. This works
+ // assuming that all tasks inserting segments at a particular point in time are going through the
+ // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks).
+
+ final SegmentIdentifier newIdentifier;
+
+ final List> existingChunks = getTimelineForIntervalWithHandle(
+ handle,
+ dataSource,
+ interval
+ ).lookup(interval);
+
+ if (existingChunks.size() > 1) {
+ // Not possible to expand more than one chunk with a single segment.
+ log.warn(
+ "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.",
+ dataSource,
+ interval,
+ maxVersion,
+ existingChunks.size()
+ );
+ return null;
+ } else {
+ SegmentIdentifier max = null;
+
+ if (!existingChunks.isEmpty()) {
+ TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks);
+ for (PartitionChunk existing : existingHolder.getObject()) {
+ if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject()
+ .getShardSpec()
+ .getPartitionNum()) {
+ max = SegmentIdentifier.fromDataSegment(existing.getObject());
+ }
+ }
+ }
+
+ final List pendings = getPendingSegmentsForIntervalWithHandle(
+ handle,
+ dataSource,
+ interval
+ );
+
+ for (SegmentIdentifier pending : pendings) {
+ if (max == null ||
+ pending.getVersion().compareTo(max.getVersion()) > 0 ||
+ (pending.getVersion().equals(max.getVersion())
+ && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) {
+ max = pending;
+ }
+ }
+
+ if (max == null) {
+ newIdentifier = new SegmentIdentifier(
+ dataSource,
+ interval,
+ maxVersion,
+ new NumberedShardSpec(0, 0)
+ );
+ } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) {
+ log.warn(
+ "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
+ dataSource,
+ interval,
+ maxVersion,
+ max.getIdentifierAsString()
+ );
+ return null;
+ } else if (max.getShardSpec() instanceof LinearShardSpec) {
+ newIdentifier = new SegmentIdentifier(
+ dataSource,
+ max.getInterval(),
+ max.getVersion(),
+ new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1)
+ );
+ } else if (max.getShardSpec() instanceof NumberedShardSpec) {
+ newIdentifier = new SegmentIdentifier(
+ dataSource,
+ max.getInterval(),
+ max.getVersion(),
+ new NumberedShardSpec(
+ max.getShardSpec().getPartitionNum() + 1,
+ ((NumberedShardSpec) max.getShardSpec()).getPartitions()
+ )
+ );
+ } else {
+ log.warn(
+ "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].",
+ dataSource,
+ interval,
+ maxVersion,
+ max.getShardSpec().getClass(),
+ max.getIdentifierAsString()
+ );
+ return null;
+ }
+ }
+
+ // SELECT -> INSERT can fail due to races; callers must be prepared to retry.
+ // Avoiding ON DUPLICATE KEY since it's not portable.
+ // Avoiding try/catch since it may cause inadvertent transaction-splitting.
+
+ handle.createStatement(
+ String.format(
+ "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, payload) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :payload)",
+ dbTables.getPendingSegmentsTable()
+ )
+ )
+ .bind("id", newIdentifier.getIdentifierAsString())
+ .bind("dataSource", dataSource)
+ .bind("created_date", new DateTime().toString())
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .bind("sequence_name", sequenceName)
+ .bind("sequence_prev_id", previousSegmentIdNotNull)
+ .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
+ .execute();
+
+ log.info(
+ "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
+ newIdentifier.getIdentifierAsString(),
+ sequenceName,
+ previousSegmentIdNotNull
+ );
+
+ return newIdentifier;
+ }
+ }
+ );
+ }
+
/**
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing. Meant
* to be called from within a transaction.
@@ -199,7 +461,8 @@ private boolean announceHistoricalSegment(final Handle handle, final DataSegment
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
- } catch(Exception e) {
+ }
+ catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
@@ -237,7 +500,7 @@ public void updateSegmentMetadata(final Set segments) throws IOExce
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
- for(final DataSegment segment : segments) {
+ for (final DataSegment segment : segments) {
updatePayload(handle, segment);
}
@@ -255,7 +518,7 @@ public void deleteSegments(final Set segments) throws IOException
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException
{
- for(final DataSegment segment : segments) {
+ for (final DataSegment segment : segments) {
deleteSegment(handle, segment);
}
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
index 70e5ae7d1feb..5b720cbfdbf7 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
@@ -28,10 +28,10 @@
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
@@ -141,6 +141,7 @@ public final boolean isTransientException(Throwable e)
return e != null && (e instanceof SQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
+ || e instanceof UnableToExecuteStatementException
|| connectorIsTransientException(e)
|| (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause())));
@@ -180,6 +181,30 @@ public Void withHandle(Handle handle) throws Exception
}
}
+ public void createPendingSegmentsTable(final String tableName)
+ {
+ createTable(
+ tableName,
+ ImmutableList.of(
+ String.format(
+ "CREATE TABLE %1$s (\n"
+ + " id VARCHAR(255) NOT NULL,\n"
+ + " dataSource VARCHAR(255) NOT NULL,\n"
+ + " created_date VARCHAR(255) NOT NULL,\n"
+ + " start VARCHAR(255) NOT NULL,\n"
+ + " \"end\" VARCHAR(255) NOT NULL,\n"
+ + " sequence_name VARCHAR(255) NOT NULL,\n"
+ + " sequence_prev_id VARCHAR(255) NOT NULL,\n"
+ + " payload %2$s NOT NULL,\n"
+ + " PRIMARY KEY (id),\n"
+ + " UNIQUE (sequence_name, sequence_prev_id)\n"
+ + ")",
+ tableName, getPayloadType()
+ )
+ )
+ );
+ }
+
public void createSegmentTable(final String tableName)
{
createTable(
@@ -355,7 +380,16 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th
public abstract DBI getDBI();
@Override
- public void createSegmentTable() {
+ public void createPendingSegmentsTable()
+ {
+ if (config.get().isCreateTables()) {
+ createPendingSegmentsTable(tablesConfigSupplier.get().getPendingSegmentsTable());
+ }
+ }
+
+ @Override
+ public void createSegmentTable()
+ {
if (config.get().isCreateTables()) {
createSegmentTable(tablesConfigSupplier.get().getSegmentsTable());
}
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java
new file mode 100644
index 000000000000..a85c29eba1b2
--- /dev/null
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import java.util.Objects;
+
+public class SegmentIdentifier
+{
+ private final String dataSource;
+ private final Interval interval;
+ private final String version;
+ private final ShardSpec shardSpec;
+ private final String asString;
+
+ @JsonCreator
+ public SegmentIdentifier(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("version") String version,
+ @JsonProperty("shardSpec") ShardSpec shardSpec
+ )
+ {
+ this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+ this.interval = Preconditions.checkNotNull(interval, "interval");
+ this.version = Preconditions.checkNotNull(version, "version");
+ this.shardSpec = Preconditions.checkNotNull(shardSpec, "shardSpec");
+ this.asString = DataSegment.makeDataSegmentIdentifier(
+ dataSource,
+ interval.getStart(),
+ interval.getEnd(),
+ version,
+ shardSpec
+ );
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
+ public String getVersion()
+ {
+ return version;
+ }
+
+ @JsonProperty
+ public ShardSpec getShardSpec()
+ {
+ return shardSpec;
+ }
+
+ public String getIdentifierAsString()
+ {
+ return asString;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SegmentIdentifier that = (SegmentIdentifier) o;
+ return Objects.equals(asString, that.asString);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return asString.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return asString;
+ }
+
+ public static SegmentIdentifier fromDataSegment(final DataSegment segment)
+ {
+ return new SegmentIdentifier(
+ segment.getDataSource(),
+ segment.getInterval(),
+ segment.getVersion(),
+ segment.getShardSpec()
+ );
+ }
+}
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java
new file mode 100644
index 000000000000..79571ef6f7e2
--- /dev/null
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentIdentifierTest
+{
+ private static final String DATA_SOURCE = "foo";
+ private static final Interval INTERVAL = new Interval("2000/PT1H");
+ private static final String VERSION = "v1";
+ private static final NumberedShardSpec SHARD_SPEC_0 = new NumberedShardSpec(0, 2);
+ private static final NumberedShardSpec SHARD_SPEC_1 = new NumberedShardSpec(1, 2);
+ private static final SegmentIdentifier ID_0 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_0);
+ private static final SegmentIdentifier ID_1 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_1);
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final ObjectMapper objectMapper = new DefaultObjectMapper();
+ objectMapper.registerSubtypes(NumberedShardSpec.class);
+
+ final SegmentIdentifier id2 = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(ID_1),
+ SegmentIdentifier.class
+ );
+
+ Assert.assertEquals(ID_1, id2);
+ Assert.assertEquals(DATA_SOURCE, id2.getDataSource());
+ Assert.assertEquals(INTERVAL, id2.getInterval());
+ Assert.assertEquals(VERSION, id2.getVersion());
+ Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum());
+ Assert.assertEquals(SHARD_SPEC_1.getPartitions(), ((NumberedShardSpec) id2.getShardSpec()).getPartitions());
+ }
+
+ @Test
+ public void testAsString()
+ {
+ Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1", ID_0.getIdentifierAsString());
+ Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1_1", ID_1.getIdentifierAsString());
+ }
+}
diff --git a/services/src/main/java/io/druid/cli/CreateTables.java b/services/src/main/java/io/druid/cli/CreateTables.java
index 8ea58ca7ebca..fdcd64778d16 100644
--- a/services/src/main/java/io/druid/cli/CreateTables.java
+++ b/services/src/main/java/io/druid/cli/CreateTables.java
@@ -25,11 +25,11 @@
import com.metamx.common.logger.Logger;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.annotations.Self;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
-import io.druid.guice.JsonConfigProvider;
-import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import java.util.List;
@@ -106,6 +106,7 @@ public void run()
{
final Injector injector = makeInjector();
MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class);
+ dbConnector.createPendingSegmentsTable();
dbConnector.createSegmentTable();
dbConnector.createRulesTable();
dbConnector.createConfigTable();