From 8e743b70c62c353a877dbb3f0766d2fc6b426d01 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 30 Oct 2015 13:30:16 -0700 Subject: [PATCH 1/2] SegmentIdentifier, like a pre-DataSegment. --- .../appenderator/SegmentIdentifier.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java new file mode 100644 index 000000000000..53d801debb48 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java @@ -0,0 +1,119 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +public class SegmentIdentifier +{ + private final String dataSource; + private final Interval interval; + private final String version; + private final ShardSpec shardSpec; + + @JsonCreator + public SegmentIdentifier( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + @JsonProperty("shardSpec") ShardSpec shardSpec + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.interval = Preconditions.checkNotNull(interval, "interval"); + this.version = Preconditions.checkNotNull(version, "version"); + this.shardSpec = Preconditions.checkNotNull(shardSpec, "shardSpec"); + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public String getVersion() + { + return version; + } + + @JsonProperty + public ShardSpec getShardSpec() + { + return shardSpec; + } + + public String getIdentifierAsString() + { + return DataSegment.makeDataSegmentIdentifier( + dataSource, + interval.getStart(), + interval.getEnd(), + version, + shardSpec + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentIdentifier that = (SegmentIdentifier) o; + return getIdentifierAsString().equals(that.getIdentifierAsString()); + } + + @Override + public int hashCode() + { + return getIdentifierAsString().hashCode(); + } + + @Override + public String toString() + { + return getIdentifierAsString(); + } + + public static SegmentIdentifier fromDataSegment(final DataSegment segment) + { + return new SegmentIdentifier( + segment.getDataSource(), + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec() + ); + } +} From e4e5f0375b538ac7e2a2f7c36d00bd861e584c44 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 30 Oct 2015 14:39:16 -0700 Subject: [PATCH 2/2] SegmentAllocateAction (fixes #1515) This is a feature meant to allow realtime tasks to work without being told upfront what shardSpec they should use (so we can potentially publish a variable number of segments per interval). The idea is that there is a "pendingSegments" table in the metadata store that tracks allocated segments. Each one has a segment id (the same segment id we know and love) and is also part of a sequence. The sequences are an idea from @cheddar that offers a way of doing replication. If there are N tasks reading exactly the same data with exactly the same logic (think Kafka tasks reading a fixed range of offsets) then you can place them in the same sequence, and they will generate the same sequence of segments. --- .../metadata/MetadataStorageConnector.java | 16 +- .../metadata/MetadataStorageTablesConfig.java | 12 +- .../postgresql/PostgreSQLConnectorTest.java | 2 +- .../MetadataStorageUpdaterJobSpec.java | 1 + .../common/actions/SegmentAllocateAction.java | 284 +++++++ .../indexing/common/actions/TaskAction.java | 3 +- .../common/actions/TaskActionToolbox.java | 18 - .../actions/SegmentAllocateActionTest.java | 738 ++++++++++++++++++ .../common/actions/TaskActionTestKit.java | 114 +++ .../indexing/overlord/TaskLockboxTest.java | 44 +- .../guice/SQLMetadataStorageDruidModule.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 26 + .../IndexerSQLMetadataStorageCoordinator.java | 357 +++++++-- .../druid/metadata/SQLMetadataConnector.java | 38 +- .../appenderator/SegmentIdentifier.java | 24 +- .../appenderator/SegmentIdentifierTest.java | 64 ++ .../main/java/io/druid/cli/CreateTables.java | 5 +- 17 files changed, 1650 insertions(+), 98 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java index c995c2ce8da1..e53553c1e9d6 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java @@ -21,7 +21,7 @@ */ public interface MetadataStorageConnector { - public Void insertOrUpdate( + Void insertOrUpdate( final String tableName, final String keyColumn, final String valueColumn, @@ -30,20 +30,22 @@ public Void insertOrUpdate( ) throws Exception; - public byte[] lookup( + byte[] lookup( final String tableName, final String keyColumn, final String valueColumn, final String key ); - public void createSegmentTable(); + void createPendingSegmentsTable(); - public void createRulesTable(); + void createSegmentTable(); - public void createConfigTable(); + void createRulesTable(); - public void createTaskTables(); + void createConfigTable(); - public void createAuditTable(); + void createTaskTables(); + + void createAuditTable(); } diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java index abf32b1b5b02..383a5847fed6 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java @@ -29,7 +29,7 @@ public class MetadataStorageTablesConfig { public static MetadataStorageTablesConfig fromBase(String base) { - return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null); + return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null); } public static final String TASK_ENTRY_TYPE = "task"; @@ -43,6 +43,9 @@ public static MetadataStorageTablesConfig fromBase(String base) @JsonProperty("base") private final String base; + @JsonProperty("pendingSegments") + private final String pendingSegmentsTable; + @JsonProperty("segments") private final String segmentsTable; @@ -67,6 +70,7 @@ public static MetadataStorageTablesConfig fromBase(String base) @JsonCreator public MetadataStorageTablesConfig( @JsonProperty("base") String base, + @JsonProperty("pendingSegments") String pendingSegmentsTable, @JsonProperty("segments") String segmentsTable, @JsonProperty("rules") String rulesTable, @JsonProperty("config") String configTable, @@ -77,6 +81,7 @@ public MetadataStorageTablesConfig( ) { this.base = (base == null) ? DEFAULT_BASE : base; + this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments"); this.segmentsTable = makeTableName(segmentsTable, "segments"); this.rulesTable = makeTableName(rulesTable, "rules"); this.configTable = makeTableName(configTable, "config"); @@ -108,6 +113,11 @@ public String getBase() return base; } + public String getPendingSegmentsTable() + { + return pendingSegmentsTable; + } + public String getSegmentsTable() { return segmentsTable; diff --git a/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java b/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java index 43f1e34e9f61..be854c90febc 100644 --- a/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java +++ b/extensions/postgresql-metadata-storage/src/test/java/io/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java @@ -35,7 +35,7 @@ public void testIsTransientException() throws Exception { PostgreSQLConnector connector = new PostgreSQLConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), - Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null)) + Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null)) ); Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE"))); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java index 4595c1affbda..e07c3c84c567 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java @@ -92,6 +92,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig() null, null, null, + null, null ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java new file mode 100644 index 000000000000..8cb7dcc0f2df --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java @@ -0,0 +1,284 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.api.client.util.Lists; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.Longs; +import com.metamx.common.Granularity; +import com.metamx.common.IAE; +import com.metamx.common.logger.Logger; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +/** + * Allocates a pending segment for a given timestamp. The preferredSegmentGranularity is used if there are no prior + * segments for the given timestamp, or if the prior segments for the given timestamp are already at the + * preferredSegmentGranularity. Otherwise, the prior segments will take precedence. + *

+ * This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand, + * although you *do* have to release them yourself. + *

+ * If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it will return + * a missing Optional. + */ +public class SegmentAllocateAction implements TaskAction> +{ + private static final Logger log = new Logger(SegmentAllocateAction.class); + + // Prevent spinning forever in situations where the segment list just won't stop changing. + private static final int MAX_ATTEMPTS = 90; + + private final String dataSource; + private final DateTime timestamp; + private final QueryGranularity queryGranularity; + private final Granularity preferredSegmentGranularity; + private final String sequenceName; + private final String previousSegmentId; + + public static List granularitiesFinerThan(final Granularity gran0) + { + final DateTime epoch = new DateTime(0); + final List retVal = Lists.newArrayList(); + for (Granularity gran : Granularity.values()) { + if (gran.bucket(epoch).toDurationMillis() <= gran0.bucket(epoch).toDurationMillis()) { + retVal.add(gran); + } + } + Collections.sort( + retVal, + new Comparator() + { + @Override + public int compare(Granularity g1, Granularity g2) + { + return Longs.compare(g2.bucket(epoch).toDurationMillis(), g1.bucket(epoch).toDurationMillis()); + } + } + ); + return retVal; + } + + public SegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("queryGranularity") QueryGranularity queryGranularity, + @JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("previousSegmentId") String previousSegmentId + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); + this.queryGranularity = Preconditions.checkNotNull(queryGranularity, "queryGranularity"); + this.preferredSegmentGranularity = Preconditions.checkNotNull( + preferredSegmentGranularity, + "preferredSegmentGranularity" + ); + this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName"); + this.previousSegmentId = previousSegmentId; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { + return timestamp; + } + + @JsonProperty + public QueryGranularity getQueryGranularity() + { + return queryGranularity; + } + + @JsonProperty + public Granularity getPreferredSegmentGranularity() + { + return preferredSegmentGranularity; + } + + @JsonProperty + public String getSequenceName() + { + return sequenceName; + } + + @JsonProperty + public String getPreviousSegmentId() + { + return previousSegmentId; + } + + @Override + public TypeReference> getReturnTypeReference() + { + return new TypeReference>() + { + }; + } + + @Override + public Optional perform( + final Task task, + final TaskActionToolbox toolbox + ) throws IOException + { + int attempt = 0; + while (true) { + attempt++; + + if (!task.getDataSource().equals(dataSource)) { + throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource); + } + + final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator(); + + // 1) if something overlaps our timestamp, use that + // 2) otherwise try preferredSegmentGranularity & going progressively smaller + + final List tryIntervals = Lists.newArrayList(); + + final Interval rowInterval = new Interval( + queryGranularity.truncate(timestamp.getMillis()), + queryGranularity.next(queryGranularity.truncate(timestamp.getMillis())) + ); + + final Set usedSegmentsForRow = ImmutableSet.copyOf( + msc.getUsedSegmentsForInterval(dataSource, rowInterval) + ); + + if (usedSegmentsForRow.isEmpty()) { + // No existing segments for this row, but there might still be nearby ones that conflict with our preferred + // segment granularity. Try that first, and then progressively smaller ones if it fails. + for (Granularity gran : granularitiesFinerThan(preferredSegmentGranularity)) { + tryIntervals.add(gran.bucket(timestamp)); + } + } else { + // Existing segment(s) exist for this row; use the interval of the first one. + tryIntervals.add(usedSegmentsForRow.iterator().next().getInterval()); + } + + for (final Interval tryInterval : tryIntervals) { + if (tryInterval.contains(rowInterval)) { + log.debug( + "Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].", + rowInterval, + tryInterval + ); + final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull(); + if (tryLock != null) { + final SegmentIdentifier identifier = msc.allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + tryLock.getVersion() + ); + if (identifier != null) { + return Optional.of(identifier); + } else { + log.debug( + "Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].", + rowInterval, + tryInterval + ); + } + } else { + log.debug("Could not acquire lock for rowInterval[%s], segmentInterval[%s].", rowInterval, tryInterval); + } + } + } + + // Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment + // overlapping with this row between when we called "mdc.getUsedSegmentsForInterval" and now. Check it again, + // and if it's different, repeat. + + if (!ImmutableSet.copyOf(msc.getUsedSegmentsForInterval(dataSource, rowInterval)).equals(usedSegmentsForRow)) { + if (attempt < MAX_ATTEMPTS) { + final long shortRandomSleep = 50 + (long) (Math.random() * 450); + log.debug( + "Used segment set changed for rowInterval[%s]. Retrying segment allocation in %,dms (attempt = %,d).", + rowInterval, + shortRandomSleep, + attempt + ); + try { + Thread.sleep(shortRandomSleep); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } else { + log.error( + "Used segment set changed for rowInterval[%s]. Not trying again (attempt = %,d).", + rowInterval, + attempt + ); + return Optional.absent(); + } + } else { + return Optional.absent(); + } + } + } + + @Override + public boolean isAudited() + { + return false; + } + + @Override + public String toString() + { + return "SegmentAllocateAction{" + + "dataSource='" + dataSource + '\'' + + ", timestamp=" + timestamp + + ", queryGranularity=" + queryGranularity + + ", preferredSegmentGranularity=" + preferredSegmentGranularity + + ", sequenceName='" + sequenceName + '\'' + + ", previousSegmentId='" + previousSegmentId + '\'' + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index 5c751e8e0917..1c0cb386f91e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -34,7 +34,8 @@ @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), - @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class) + @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class), + @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java index ae0ac73c6e77..e85dffdc8994 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java @@ -65,24 +65,6 @@ public ServiceEmitter getEmitter() return emitter; } - public boolean segmentsAreFromSamePartitionSet( - final Set segments - ) - { - // Verify that these segments are all in the same partition set - - Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty"); - final DataSegment firstSegment = segments.iterator().next(); - for (final DataSegment segment : segments) { - if (!segment.getDataSource().equals(firstSegment.getDataSource()) - || !segment.getInterval().equals(firstSegment.getInterval()) - || !segment.getVersion().equals(firstSegment.getVersion())) { - return false; - } - } - return true; - } - public void verifyTaskLocks( final Task task, final Set segments diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java new file mode 100644 index 000000000000..89c4b3cd3ce4 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -0,0 +1,738 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.metamx.common.Granularity; +import com.metamx.common.ISE; +import io.druid.granularity.DurationGranularity; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.Task; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.SingleDimensionShardSpec; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class SegmentAllocateActionTest +{ + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Rule + public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); + + private static final String DATA_SOURCE = "none"; + private static final DateTime PARTY_TIME = new DateTime("1999"); + private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000"); + + @Test + public void testGranularitiesFinerThanDay() throws Exception + { + Assert.assertEquals( + ImmutableList.of( + Granularity.DAY, + Granularity.SIX_HOUR, + Granularity.HOUR, + Granularity.FIFTEEN_MINUTE, + Granularity.TEN_MINUTE, + Granularity.FIVE_MINUTE, + Granularity.MINUTE, + Granularity.SECOND + ), + SegmentAllocateAction.granularitiesFinerThan(Granularity.DAY) + ); + } + + @Test + public void testGranularitiesFinerThanHour() throws Exception + { + Assert.assertEquals( + ImmutableList.of( + Granularity.HOUR, + Granularity.FIFTEEN_MINUTE, + Granularity.TEN_MINUTE, + Granularity.FIVE_MINUTE, + Granularity.MINUTE, + Granularity.SECOND + ), + SegmentAllocateAction.granularitiesFinerThan(Granularity.HOUR) + ); + } + + @Test + public void testManySegmentsSameInterval() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + null + ); + final SegmentIdentifier id2 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id3 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id2.getIdentifierAsString() + ); + + final TaskLock partyLock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter( + new Predicate() + { + @Override + public boolean apply(TaskLock input) + { + return input.getInterval().contains(PARTY_TIME); + } + } + ) + ); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(0, 0) + ) + ); + assertSameIdentifier( + id2, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(1, 0) + ) + ); + assertSameIdentifier( + id3, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(2, 0) + ) + ); + } + + @Test + public void testResumeSequence() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + null + ); + final SegmentIdentifier id2 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id3 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id2.getIdentifierAsString() + ); + final SegmentIdentifier id4 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id5 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id6 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.MINUTE, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id7 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.DAY, + "s1", + id1.getIdentifierAsString() + ); + + final TaskLock partyLock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter( + new Predicate() + { + @Override + public boolean apply(TaskLock input) + { + return input.getInterval().contains(PARTY_TIME); + } + } + ) + ); + final TaskLock futureLock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter( + new Predicate() + { + @Override + public boolean apply(TaskLock input) + { + return input.getInterval().contains(THE_DISTANT_FUTURE); + } + } + ) + ); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(0, 0) + ) + ); + assertSameIdentifier( + id2, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(THE_DISTANT_FUTURE), + futureLock.getVersion(), + new NumberedShardSpec(0, 0) + ) + ); + assertSameIdentifier( + id3, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(1, 0) + ) + ); + Assert.assertNull(id4); + assertSameIdentifier(id5, id2); + Assert.assertNull(id6); + assertSameIdentifier(id7, id2); + } + + @Test + public void testMultipleSequences() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null); + final SegmentIdentifier id2 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s2", null); + final SegmentIdentifier id3 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + final SegmentIdentifier id4 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id3.getIdentifierAsString() + ); + final SegmentIdentifier id5 = allocate( + task, + THE_DISTANT_FUTURE, + QueryGranularity.NONE, + Granularity.HOUR, + "s2", + id2.getIdentifierAsString() + ); + final SegmentIdentifier id6 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null); + + final TaskLock partyLock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter( + new Predicate() + { + @Override + public boolean apply(TaskLock input) + { + return input.getInterval().contains(PARTY_TIME); + } + } + ) + ); + final TaskLock futureLock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter( + new Predicate() + { + @Override + public boolean apply(TaskLock input) + { + return input.getInterval().contains(THE_DISTANT_FUTURE); + } + } + ) + ); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(0, 0) + ) + ); + assertSameIdentifier( + id2, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(1, 0) + ) + ); + assertSameIdentifier( + id3, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + partyLock.getVersion(), + new NumberedShardSpec(2, 0) + ) + ); + assertSameIdentifier( + id4, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(THE_DISTANT_FUTURE), + futureLock.getVersion(), + new NumberedShardSpec(0, 0) + ) + ); + assertSameIdentifier( + id5, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(THE_DISTANT_FUTURE), + futureLock.getVersion(), + new NumberedShardSpec(1, 0) + ) + ); + assertSameIdentifier( + id6, + id1 + ); + } + + @Test + public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new LinearShardSpec(0)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new LinearShardSpec(1)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + null + ); + final SegmentIdentifier id2 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new LinearShardSpec(2) + ) + ); + assertSameIdentifier( + id2, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new LinearShardSpec(3) + ) + ); + } + + @Test + public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(1, 2)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + null + ); + final SegmentIdentifier id2 = allocate( + task, + PARTY_TIME, + QueryGranularity.NONE, + Granularity.HOUR, + "s1", + id1.getIdentifierAsString() + ); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new NumberedShardSpec(2, 2) + ) + ); + assertSameIdentifier( + id2, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new NumberedShardSpec(3, 2) + ) + ); + } + + @Test + public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(1, 2)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.DAY, "s1", null); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new NumberedShardSpec(2, 2) + ) + ); + } + + @Test + public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(1, 2)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.MINUTE, "s1", null); + + assertSameIdentifier( + id1, + new SegmentIdentifier( + DATA_SOURCE, + Granularity.HOUR.bucket(PARTY_TIME), + PARTY_TIME.toString(), + new NumberedShardSpec(2, 2) + ) + ); + } + + @Test + public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new NumberedShardSpec(1, 2)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.DAY, "s1", null); + + Assert.assertNull(id1); + } + + @Test + public void testCannotDoAnythingWithSillyQueryGranularity() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.HOUR, "s1", null); + + Assert.assertNull(id1); + } + + @Test + public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception + { + final Task task = new NoopTask(null, 0, 0, null, null, null); + + taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0)) + .build(), + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularity.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.toString()) + .shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1)) + .build() + ) + ); + + taskActionTestKit.getTaskLockbox().add(task); + + final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null); + + Assert.assertNull(id1); + } + + @Test + public void testSerde() throws Exception + { + final SegmentAllocateAction action = new SegmentAllocateAction( + DATA_SOURCE, + PARTY_TIME, + QueryGranularity.MINUTE, + Granularity.HOUR, + "s1", + "prev" + ); + + final ObjectMapper objectMapper = new DefaultObjectMapper(); + final SegmentAllocateAction action2 = (SegmentAllocateAction) objectMapper.readValue( + objectMapper.writeValueAsBytes(action), + TaskAction.class + ); + + Assert.assertEquals(DATA_SOURCE, action2.getDataSource()); + Assert.assertEquals(PARTY_TIME, action2.getTimestamp()); + Assert.assertEquals(new DurationGranularity(60000, 0), action2.getQueryGranularity()); + Assert.assertSame(Granularity.HOUR, action2.getPreferredSegmentGranularity()); + Assert.assertEquals("s1", action2.getSequenceName()); + Assert.assertEquals("prev", action2.getPreviousSegmentId()); + } + + private SegmentIdentifier allocate( + final Task task, + final DateTime timestamp, + final QueryGranularity queryGranularity, + final Granularity preferredSegmentGranularity, + final String sequenceName, + final String sequencePreviousId + ) throws Exception + { + final SegmentAllocateAction action = new SegmentAllocateAction( + DATA_SOURCE, + timestamp, + queryGranularity, + preferredSegmentGranularity, + sequenceName, + sequencePreviousId + ); + return action.perform(task, taskActionTestKit.getTaskActionToolbox()).orNull(); + } + + private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other) + { + Assert.assertEquals(one, other); + Assert.assertEquals(one.getShardSpec().getPartitionNum(), other.getShardSpec().getPartitionNum()); + + if (one.getShardSpec().getClass() == NumberedShardSpec.class + && other.getShardSpec().getClass() == NumberedShardSpec.class) { + Assert.assertEquals( + ((NumberedShardSpec) one.getShardSpec()).getPartitions(), + ((NumberedShardSpec) other.getShardSpec()).getPartitions() + ); + } else if (one.getShardSpec().getClass() == LinearShardSpec.class + && other.getShardSpec().getClass() == LinearShardSpec.class) { + // do nothing + } else { + throw new ISE( + "Unexpected shardSpecs [%s] and [%s]", + one.getShardSpec().getClass(), + other.getShardSpec().getClass() + ); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java new file mode 100644 index 000000000000..a4d86fd2575e --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -0,0 +1,114 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.google.common.base.Suppliers; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.overlord.HeapMemoryTaskStorage; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.TaskStorage; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.TestDerbyConnector; +import io.druid.server.metrics.NoopServiceEmitter; +import org.joda.time.Period; +import org.junit.rules.ExternalResource; + +public class TaskActionTestKit extends ExternalResource +{ + private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid"); + + private TaskStorage taskStorage; + private TaskLockbox taskLockbox; + private TestDerbyConnector testDerbyConnector; + private IndexerMetadataStorageCoordinator metadataStorageCoordinator; + private TaskActionToolbox taskActionToolbox; + + public MetadataStorageTablesConfig getMetadataStorageTablesConfig() + { + return metadataStorageTablesConfig; + } + + public TaskStorage getTaskStorage() + { + return taskStorage; + } + + public TaskLockbox getTaskLockbox() + { + return taskLockbox; + } + + public TestDerbyConnector getTestDerbyConnector() + { + return testDerbyConnector; + } + + public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() + { + return metadataStorageCoordinator; + } + + public TaskActionToolbox getTaskActionToolbox() + { + return taskActionToolbox; + } + + @Override + public void before() + { + taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); + taskLockbox = new TaskLockbox(taskStorage); + testDerbyConnector = new TestDerbyConnector( + Suppliers.ofInstance(new MetadataStorageConnectorConfig()), + Suppliers.ofInstance(metadataStorageTablesConfig) + ); + metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator( + new TestUtils().getTestObjectMapper(), + metadataStorageTablesConfig, + testDerbyConnector + ); + taskActionToolbox = new TaskActionToolbox( + taskLockbox, + metadataStorageCoordinator, + new NoopServiceEmitter() + ); + testDerbyConnector.createPendingSegmentsTable(); + testDerbyConnector.createSegmentTable(); + testDerbyConnector.createRulesTable(); + testDerbyConnector.createConfigTable(); + testDerbyConnector.createTaskTables(); + testDerbyConnector.createAuditTable(); + } + + @Override + public void after() + { + testDerbyConnector.tearDown(); + taskStorage = null; + taskLockbox = null; + testDerbyConnector = null; + metadataStorageCoordinator = null; + taskActionToolbox = null; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index bca630e29576..6f809b71a8a3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -17,23 +17,28 @@ package io.druid.indexing.overlord; - +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import junit.framework.Assert; import org.joda.time.Interval; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.List; + public class TaskLockboxTest { - private TaskStorage taskStorage; + private TaskStorage taskStorage; - private TaskLockbox lockbox; + private TaskLockbox lockbox; @Before - public void setUp(){ + public void setUp() + { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); lockbox = new TaskLockbox(taskStorage); } @@ -49,7 +54,7 @@ public void testLock() throws InterruptedException @Test(expected = IllegalStateException.class) public void testLockForInactiveTask() throws InterruptedException { - lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02")); + lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")); } @Test(expected = IllegalStateException.class) @@ -78,7 +83,30 @@ public void testTryLock() throws InterruptedException // Now task2 should be able to get the lock Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent()); + } + @Test + public void testTrySmallerLock() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + Optional lock1 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")); + Assert.assertTrue(lock1.isPresent()); + Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval()); + + // same task tries to take partially overlapping interval; should fail + Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")).isPresent()); + + // same task tries to take contained interval; should succeed and should match the original lock + Optional lock2 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")); + Assert.assertTrue(lock2.isPresent()); + Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock2.get().getInterval()); + + // only the first lock should actually exist + Assert.assertEquals( + ImmutableList.of(lock1.get()), + lockbox.findLocksForTask(task) + ); } @Test(expected = IllegalStateException.class) @@ -93,8 +121,8 @@ public void testTryLockAfterTaskComplete() throws InterruptedException Task task = NoopTask.create(); lockbox.add(task); lockbox.remove(task); - Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } - + Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); + } } diff --git a/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java b/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java index 217764dad2fe..9eb04aa01cb6 100644 --- a/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/SQLMetadataStorageDruidModule.java @@ -198,7 +198,7 @@ public void configure(Binder binder) PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) .addBinding(type) .to(IndexerSQLMetadataStorageCoordinator.class) - .in(LazySingleton.class); + .in(ManageLifecycle.class); PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) .addBinding(type) diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 4a7122d1d8f3..b7d438c41f8d 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -17,6 +17,7 @@ package io.druid.indexing.overlord; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -51,6 +52,31 @@ public List getUsedSegmentsForInterval(final String dataSource, fin */ public Set announceHistoricalSegments(final Set segments) throws IOException; + /** + * Allocate a new pending segment in the pending segments table. This segment identifier will never be given out + * again, unless another call is made with the same dataSource, sequenceName, and previousSegmentId. + *

+ * The sequenceName and previousSegmentId parameters are meant to make it easy for two independent ingestion tasks + * to produce the same series of segments. + *

+ * Note that a segment sequence may include segments with a variety of different intervals and versions. + * + * @param dataSource dataSource for which to allocate a segment + * @param sequenceName name of the group of ingestion tasks producing a segment series + * @param previousSegmentId previous segment in the series; may be null or empty, meaning this is the first segment + * @param interval interval for which to allocate a segment + * @param maxVersion use this version if we have no better version to use. The returned segment identifier may + * have a version lower than this one, but will not have one higher. + * + * @return the pending segment identifier, or null if it was impossible to allocate a new segment + */ + SegmentIdentifier allocatePendingSegment( + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion + ) throws IOException; public void updateSegmentMetadata(final Set segments) throws IOException; diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 9d3ba1f28566..dbb49a624069 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -26,12 +27,17 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.logger.Logger; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.PartitionChunk; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.FoldController; @@ -72,67 +78,124 @@ public IndexerSQLMetadataStorageCoordinator( this.connector = connector; } - public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) - throws IOException + @LifecycleStart + public void start() { - final VersionedIntervalTimeline timeline = connector.getDBI().withHandle( - new HandleCallback>() + connector.createPendingSegmentsTable(); + connector.createSegmentTable(); + } + + public List getUsedSegmentsForInterval( + final String dataSource, + final Interval interval + ) throws IOException + { + return connector.retryWithHandle( + new HandleCallback>() { @Override - public VersionedIntervalTimeline withHandle(Handle handle) throws IOException + public List withHandle(Handle handle) throws Exception { - final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( - Ordering.natural() + final VersionedIntervalTimeline timeline = getTimelineForIntervalWithHandle( + handle, + dataSource, + interval ); - final ResultIterator dbSegments = - handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true", - dbTables.getSegmentsTable() + return Lists.newArrayList( + Iterables.concat( + Iterables.transform( + timeline.lookup(interval), + new Function, Iterable>() + { + @Override + public Iterable apply(TimelineObjectHolder input) + { + return input.getObject().payloads(); + } + } ) ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .iterator(); - - while (dbSegments.hasNext()) { - final byte[] payload = dbSegments.next(); - - DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class - ); + ); - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + } + } + ); + } - } + private List getPendingSegmentsForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval + ) throws IOException + { + final List identifiers = Lists.newArrayList(); - dbSegments.close(); + final ResultIterator dbSegments = + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", + dbTables.getPendingSegmentsTable() + ) + ) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .iterator(); + + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + final SegmentIdentifier identifier = jsonMapper.readValue(payload, SegmentIdentifier.class); + + if (interval.overlaps(identifier.getInterval())) { + identifiers.add(identifier); + } + } - return timeline; + dbSegments.close(); - } - } + return identifiers; + } + + private VersionedIntervalTimeline getTimelineForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval + ) throws IOException + { + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + Ordering.natural() ); - return Lists.newArrayList( - Iterables.concat( - Iterables.transform( - timeline.lookup(interval), - new Function, Iterable>() - { - @Override - public Iterable apply(TimelineObjectHolder input) - { - return input.getObject().payloads(); - } - } + final ResultIterator dbSegments = + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true", + dbTables.getSegmentsTable() ) ) - ); + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .iterator(); + + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + + DataSegment segment = jsonMapper.readValue( + payload, + DataSegment.class + ); + + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + + } + + dbSegments.close(); + + return timeline; } /** @@ -140,6 +203,7 @@ public Iterable apply(TimelineObjectHolder inp * with identifiers already in the database will not be added). * * @param segments set of segments to add + * * @return set of segments actually added */ public Set announceHistoricalSegments(final Set segments) throws IOException @@ -164,6 +228,204 @@ public Set inTransaction(Handle handle, TransactionStatus transacti ); } + @Override + public SegmentIdentifier allocatePendingSegment( + final String dataSource, + final String sequenceName, + final String previousSegmentId, + final Interval interval, + final String maxVersion + ) throws IOException + { + Preconditions.checkNotNull(dataSource, "dataSource"); + Preconditions.checkNotNull(sequenceName, "sequenceName"); + Preconditions.checkNotNull(interval, "interval"); + Preconditions.checkNotNull(maxVersion, "maxVersion"); + + final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; + + return connector.retryTransaction( + new TransactionCallback() + { + @Override + public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + final List existingBytes = handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ) + ).bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .map(ByteArrayMapper.FIRST) + .list(); + + if (!existingBytes.isEmpty()) { + final SegmentIdentifier existingIdentifier = jsonMapper.readValue( + Iterables.getOnlyElement(existingBytes), + SegmentIdentifier.class + ); + + if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() + && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return existingIdentifier; + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull, + interval + ); + + return null; + } + } + + // Make up a pending segment based on existing segments and pending segments in the DB. This works + // assuming that all tasks inserting segments at a particular point in time are going through the + // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). + + final SegmentIdentifier newIdentifier; + + final List> existingChunks = getTimelineForIntervalWithHandle( + handle, + dataSource, + interval + ).lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", + dataSource, + interval, + maxVersion, + existingChunks.size() + ); + return null; + } else { + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } + + final List pendings = getPendingSegmentsForIntervalWithHandle( + handle, + dataSource, + interval + ); + + for (SegmentIdentifier pending : pendings) { + if (max == null || + pending.getVersion().compareTo(max.getVersion()) > 0 || + (pending.getVersion().equals(max.getVersion()) + && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { + max = pending; + } + } + + if (max == null) { + newIdentifier = new SegmentIdentifier( + dataSource, + interval, + maxVersion, + new NumberedShardSpec(0, 0) + ); + } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + maxVersion, + max.getIdentifierAsString() + ); + return null; + } else if (max.getShardSpec() instanceof LinearShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", + dataSource, + interval, + maxVersion, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ); + return null; + } + } + + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :payload)", + dbTables.getPendingSegmentsTable() + ) + ) + .bind("id", newIdentifier.getIdentifierAsString()) + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .execute(); + + log.info( + "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + newIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return newIdentifier; + } + } + ); + } + /** * Attempts to insert a single segment to the database. If the segment already exists, will do nothing. Meant * to be called from within a transaction. @@ -199,7 +461,8 @@ private boolean announceHistoricalSegment(final Handle handle, final DataSegment .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); - } catch(Exception e) { + } + catch (Exception e) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); } else { @@ -237,7 +500,7 @@ public void updateSegmentMetadata(final Set segments) throws IOExce @Override public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception { - for(final DataSegment segment : segments) { + for (final DataSegment segment : segments) { updatePayload(handle, segment); } @@ -255,7 +518,7 @@ public void deleteSegments(final Set segments) throws IOException @Override public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException { - for(final DataSegment segment : segments) { + for (final DataSegment segment : segments) { deleteSegment(handle, segment); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java index 70e5ae7d1feb..5b720cbfdbf7 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java @@ -28,10 +28,10 @@ import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.DBIException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -141,6 +141,7 @@ public final boolean isTransientException(Throwable e) return e != null && (e instanceof SQLTransientException || e instanceof SQLRecoverableException || e instanceof UnableToObtainConnectionException + || e instanceof UnableToExecuteStatementException || connectorIsTransientException(e) || (e instanceof SQLException && isTransientException(e.getCause())) || (e instanceof DBIException && isTransientException(e.getCause()))); @@ -180,6 +181,30 @@ public Void withHandle(Handle handle) throws Exception } } + public void createPendingSegmentsTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " start VARCHAR(255) NOT NULL,\n" + + " \"end\" VARCHAR(255) NOT NULL,\n" + + " sequence_name VARCHAR(255) NOT NULL,\n" + + " sequence_prev_id VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (id),\n" + + " UNIQUE (sequence_name, sequence_prev_id)\n" + + ")", + tableName, getPayloadType() + ) + ) + ); + } + public void createSegmentTable(final String tableName) { createTable( @@ -355,7 +380,16 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th public abstract DBI getDBI(); @Override - public void createSegmentTable() { + public void createPendingSegmentsTable() + { + if (config.get().isCreateTables()) { + createPendingSegmentsTable(tablesConfigSupplier.get().getPendingSegmentsTable()); + } + } + + @Override + public void createSegmentTable() + { if (config.get().isCreateTables()) { createSegmentTable(tablesConfigSupplier.get().getSegmentsTable()); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java index 53d801debb48..a85c29eba1b2 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentIdentifier.java @@ -26,12 +26,15 @@ import io.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; +import java.util.Objects; + public class SegmentIdentifier { private final String dataSource; private final Interval interval; private final String version; private final ShardSpec shardSpec; + private final String asString; @JsonCreator public SegmentIdentifier( @@ -45,6 +48,13 @@ public SegmentIdentifier( this.interval = Preconditions.checkNotNull(interval, "interval"); this.version = Preconditions.checkNotNull(version, "version"); this.shardSpec = Preconditions.checkNotNull(shardSpec, "shardSpec"); + this.asString = DataSegment.makeDataSegmentIdentifier( + dataSource, + interval.getStart(), + interval.getEnd(), + version, + shardSpec + ); } @JsonProperty @@ -73,13 +83,7 @@ public ShardSpec getShardSpec() public String getIdentifierAsString() { - return DataSegment.makeDataSegmentIdentifier( - dataSource, - interval.getStart(), - interval.getEnd(), - version, - shardSpec - ); + return asString; } @Override @@ -92,19 +96,19 @@ public boolean equals(Object o) return false; } SegmentIdentifier that = (SegmentIdentifier) o; - return getIdentifierAsString().equals(that.getIdentifierAsString()); + return Objects.equals(asString, that.asString); } @Override public int hashCode() { - return getIdentifierAsString().hashCode(); + return asString.hashCode(); } @Override public String toString() { - return getIdentifierAsString(); + return asString; } public static SegmentIdentifier fromDataSegment(final DataSegment segment) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java new file mode 100644 index 000000000000..79571ef6f7e2 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentIdentifierTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentIdentifierTest +{ + private static final String DATA_SOURCE = "foo"; + private static final Interval INTERVAL = new Interval("2000/PT1H"); + private static final String VERSION = "v1"; + private static final NumberedShardSpec SHARD_SPEC_0 = new NumberedShardSpec(0, 2); + private static final NumberedShardSpec SHARD_SPEC_1 = new NumberedShardSpec(1, 2); + private static final SegmentIdentifier ID_0 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_0); + private static final SegmentIdentifier ID_1 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_1); + + @Test + public void testSerde() throws Exception + { + final ObjectMapper objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes(NumberedShardSpec.class); + + final SegmentIdentifier id2 = objectMapper.readValue( + objectMapper.writeValueAsBytes(ID_1), + SegmentIdentifier.class + ); + + Assert.assertEquals(ID_1, id2); + Assert.assertEquals(DATA_SOURCE, id2.getDataSource()); + Assert.assertEquals(INTERVAL, id2.getInterval()); + Assert.assertEquals(VERSION, id2.getVersion()); + Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum()); + Assert.assertEquals(SHARD_SPEC_1.getPartitions(), ((NumberedShardSpec) id2.getShardSpec()).getPartitions()); + } + + @Test + public void testAsString() + { + Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1", ID_0.getIdentifierAsString()); + Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1_1", ID_1.getIdentifierAsString()); + } +} diff --git a/services/src/main/java/io/druid/cli/CreateTables.java b/services/src/main/java/io/druid/cli/CreateTables.java index 8ea58ca7ebca..fdcd64778d16 100644 --- a/services/src/main/java/io/druid/cli/CreateTables.java +++ b/services/src/main/java/io/druid/cli/CreateTables.java @@ -25,11 +25,11 @@ import com.metamx.common.logger.Logger; import io.airlift.airline.Command; import io.airlift.airline.Option; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; import io.druid.metadata.MetadataStorageConnector; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; import java.util.List; @@ -106,6 +106,7 @@ public void run() { final Injector injector = makeInjector(); MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class); + dbConnector.createPendingSegmentsTable(); dbConnector.createSegmentTable(); dbConnector.createRulesTable(); dbConnector.createConfigTable();