diff --git a/api/src/main/java/io/druid/timeline/TaskDataSegment.java b/api/src/main/java/io/druid/timeline/TaskDataSegment.java new file mode 100644 index 000000000000..23bee63cdafb --- /dev/null +++ b/api/src/main/java/io/druid/timeline/TaskDataSegment.java @@ -0,0 +1,94 @@ +package io.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import java.util.Map; + +/** + * Created by haoxiang on 16/11/11. + */ +public class TaskDataSegment +{ + + private final String type; + private final String id; + private final String groupId; + private final String dataSource; + private final Map ioConfig; + private final Interval interval; + + @JsonCreator + public TaskDataSegment( + @JsonProperty("type") String type, + @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("ioConfig") Map ioConfig, + @JsonProperty("interval") Interval interval + ) + { + this.type = type; + this.id = id; + this.groupId = groupId; + this.dataSource = dataSource; + this.ioConfig = ioConfig; + this.interval = interval; + } + + /** + * Get dataSource + * + * @return the dataSource + */ + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Map getIoConfig() + { + return ioConfig; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public String toString() + { + return "TaskDataSegment{" + + "type=" + type + + ", id=" + id + + ", groupId=" + groupId + + ", dataSource=" + dataSource + + '}'; + } + + +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index b6c13997f452..a54e3d596b61 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -28,6 +28,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -39,10 +40,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto final private Set published = Sets.newConcurrentHashSet(); final private Set nuked = Sets.newConcurrentHashSet(); final private List unusedSegments; + final private List taskDataSegments; public TestIndexerMetadataStorageCoordinator() { unusedSegments = Lists.newArrayList(); + taskDataSegments = Lists.newArrayList(); } @Override @@ -65,7 +68,7 @@ public List getUsedSegmentsForInterval(String dataSource, Interval @Override public List getUsedSegmentsForIntervals( - String dataSource, List intervals + String dataSource, List intervals ) throws IOException { return ImmutableList.of(); @@ -93,9 +96,9 @@ public Set announceHistoricalSegments(Set segments) @Override public SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata oldCommitMetadata, - DataSourceMetadata newCommitMetadata + Set segments, + DataSourceMetadata oldCommitMetadata, + DataSourceMetadata newCommitMetadata ) throws IOException { // Don't actually compare metadata, just do it! @@ -104,11 +107,11 @@ public SegmentPublishResult announceHistoricalSegments( @Override public SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException { throw new UnsupportedOperationException(); @@ -120,6 +123,12 @@ public void deleteSegments(Set segments) nuked.addAll(segments); } + @Override + public List getNotActiveTask(final Interval interval){ + + return ImmutableList.copyOf(taskDataSegments); + } + @Override public void updateSegmentMetadata(Set segments) throws IOException { diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 808bd35506d1..4a683b039277 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -21,6 +21,7 @@ import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -42,7 +43,7 @@ public interface IndexerMetadataStorageCoordinator * @throws IOException */ List getUsedSegmentsForInterval(String dataSource, Interval interval) - throws IOException; + throws IOException; /** * Get all segments which may include any data in the interval and are flagged as used. @@ -55,7 +56,7 @@ List getUsedSegmentsForInterval(String dataSource, Interval interva * @throws IOException */ List getUsedSegmentsForIntervals(final String dataSource, final List intervals) - throws IOException; + throws IOException; /** * Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments @@ -86,11 +87,11 @@ List getUsedSegmentsForIntervals(final String dataSource, final Lis * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException; /** @@ -113,9 +114,9 @@ SegmentIdentifier allocatePendingSegment( * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null */ SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + Set segments, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata ) throws IOException; DataSourceMetadata getDataSourceMetadata(String dataSource); @@ -141,4 +142,11 @@ SegmentPublishResult announceHistoricalSegments( * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval */ List getUnusedSegmentsForInterval(String dataSource, Interval interval); + + /** + * Get all task which active property is 0, is used to delete the useless segments in pendingSegment table. + * @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive + * @return The TaskDataSegment list which used to match the pendingSegment table's payload + */ + List getNotActiveTask(final Interval interval); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 9d8cf4d2fc56..58a286e4d182 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -43,6 +43,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; @@ -85,9 +86,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor @Inject public IndexerSQLMetadataStorageCoordinator( - ObjectMapper jsonMapper, - MetadataStorageTablesConfig dbTables, - SQLMetadataConnector connector + ObjectMapper jsonMapper, + MetadataStorageTablesConfig dbTables, + SQLMetadataConnector connector ) { this.jsonMapper = jsonMapper; @@ -112,8 +113,8 @@ public void start() @Override public List getUsedSegmentsForInterval( - final String dataSource, - final Interval interval + final String dataSource, + final Interval interval ) throws IOException { return getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval)); @@ -121,75 +122,75 @@ public List getUsedSegmentsForInterval( @Override public List getUsedSegmentsForIntervals( - final String dataSource, final List intervals + final String dataSource, final List intervals ) throws IOException { return connector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( - handle, - dataSource, - intervals - ); - - Set segments = Sets.newHashSet( - Iterables.concat( - Iterables.transform( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( + handle, + dataSource, + intervals + ); + + Set segments = Sets.newHashSet( Iterables.concat( - Iterables.transform( - intervals, - new Function>>() - { - @Override - public Iterable> apply(Interval interval) - { - return timeline.lookup(interval); - } - } - ) - ), - new Function, Iterable>() - { - @Override - public Iterable apply(TimelineObjectHolder input) - { - return input.getObject().payloads(); - } - } - ) - ) - ); + Iterables.transform( + Iterables.concat( + Iterables.transform( + intervals, + new Function>>() + { + @Override + public Iterable> apply(Interval interval) + { + return timeline.lookup(interval); + } + } + ) + ), + new Function, Iterable>() + { + @Override + public Iterable apply(TimelineObjectHolder input) + { + return input.getObject().payloads(); + } + } + ) + ) + ); - return new ArrayList<>(segments); - } - } + return new ArrayList<>(segments); + } + } ); } private List getPendingSegmentsForIntervalWithHandle( - final Handle handle, - final String dataSource, - final Interval interval + final Handle handle, + final String dataSource, + final Interval interval ) throws IOException { final List identifiers = Lists.newArrayList(); final ResultIterator dbSegments = - handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", - dbTables.getPendingSegmentsTable() + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .iterator(); + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .iterator(); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); @@ -206,9 +207,9 @@ private List getPendingSegmentsForIntervalWithHandle( } private VersionedIntervalTimeline getTimelineForIntervalsWithHandle( - final Handle handle, - final String dataSource, - final List intervals + final Handle handle, + final String dataSource, + final List intervals ) throws IOException { if (intervals == null || intervals.isEmpty()) { @@ -219,7 +220,7 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND ("); for (int i = 0; i < intervals.size(); i++) { sb.append( - "(start <= ? AND \"end\" >= ?)" + "(start <= ? AND \"end\" >= ?)" ); if (i == intervals.size() - 1) { sb.append(")"); @@ -229,33 +230,33 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi } Query> sql = handle.createQuery( - String.format( - sb.toString(), - dbTables.getSegmentsTable() - ) + String.format( + sb.toString(), + dbTables.getSegmentsTable() + ) ).bind(0, dataSource); for (int i = 0; i < intervals.size(); i++) { Interval interval = intervals.get(i); sql = sql - .bind(2 * i + 1, interval.getEnd().toString()) - .bind(2 * i + 2, interval.getStart().toString()); + .bind(2 * i + 1, interval.getEnd().toString()) + .bind(2 * i + 2, interval.getStart().toString()); } final ResultIterator dbSegments = sql - .map(ByteArrayMapper.FIRST) - .iterator(); + .map(ByteArrayMapper.FIRST) + .iterator(); final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( - Ordering.natural() + Ordering.natural() ); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class + payload, + DataSegment.class ); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); @@ -292,9 +293,9 @@ public Set announceHistoricalSegments(final Set segmen */ @Override public SegmentPublishResult announceHistoricalSegments( - final Set segments, - final DataSourceMetadata startMetadata, - final DataSourceMetadata endMetadata + final Set segments, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata ) throws IOException { if (segments.isEmpty()) { @@ -315,7 +316,7 @@ public SegmentPublishResult announceHistoricalSegments( // Find which segments are used (i.e. not overshadowed). final Set usedSegments = Sets.newHashSet(); for (TimelineObjectHolder holder : VersionedIntervalTimeline.forSegments(segments) - .lookup(JodaUtils.ETERNITY)) { + .lookup(JodaUtils.ETERNITY)) { for (PartitionChunk chunk : holder.getObject()) { usedSegments.add(chunk.getObject()); } @@ -325,47 +326,47 @@ public SegmentPublishResult announceHistoricalSegments( try { return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentPublishResult inTransaction( - final Handle handle, - final TransactionStatus transactionStatus - ) throws Exception - { - final Set inserted = Sets.newHashSet(); - - if (startMetadata != null) { - final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( - handle, - dataSource, - startMetadata, - endMetadata - ); - - if (result != DataSourceMetadataUpdateResult.SUCCESS) { - transactionStatus.setRollbackOnly(); - txnFailure.set(true); + new TransactionCallback() + { + @Override + public SegmentPublishResult inTransaction( + final Handle handle, + final TransactionStatus transactionStatus + ) throws Exception + { + final Set inserted = Sets.newHashSet(); + + if (startMetadata != null) { + final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + handle, + dataSource, + startMetadata, + endMetadata + ); + + if (result != DataSourceMetadataUpdateResult.SUCCESS) { + transactionStatus.setRollbackOnly(); + txnFailure.set(true); + + if (result == DataSourceMetadataUpdateResult.FAILURE) { + throw new RuntimeException("Aborting transaction!"); + } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) { + throw new RetryTransactionException("Aborting transaction!"); + } + } + } - if (result == DataSourceMetadataUpdateResult.FAILURE) { - throw new RuntimeException("Aborting transaction!"); - } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) { - throw new RetryTransactionException("Aborting transaction!"); + for (final DataSegment segment : segments) { + if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { + inserted.add(segment); + } } - } - } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); + return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); } - } - - return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); - } - }, - 3, - SQLMetadataConnector.DEFAULT_MAX_TRIES + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } catch (CallbackFailedException e) { @@ -379,11 +380,11 @@ public SegmentPublishResult inTransaction( @Override public SegmentIdentifier allocatePendingSegment( - final String dataSource, - final String sequenceName, - final String previousSegmentId, - final Interval interval, - final String maxVersion + final String dataSource, + final String sequenceName, + final String previousSegmentId, + final Interval interval, + final String maxVersion ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -394,200 +395,200 @@ public SegmentIdentifier allocatePendingSegment( final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - final List existingBytes = handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ).bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .map(ByteArrayMapper.FIRST) - .list(); - - if (!existingBytes.isEmpty()) { - final SegmentIdentifier existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdentifier.class - ); - - if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() - && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return existingIdentifier; - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull, - interval - ); + new TransactionCallback() + { + @Override + public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + final List existingBytes = handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ) + ).bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .map(ByteArrayMapper.FIRST) + .list(); + + if (!existingBytes.isEmpty()) { + final SegmentIdentifier existingIdentifier = jsonMapper.readValue( + Iterables.getOnlyElement(existingBytes), + SegmentIdentifier.class + ); + + if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() + && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return existingIdentifier; + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull, + interval + ); + + return null; + } + } - return null; - } - } + // Make up a pending segment based on existing segments and pending segments in the DB. This works + // assuming that all tasks inserting segments at a particular point in time are going through the + // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). + + final SegmentIdentifier newIdentifier; + + final List> existingChunks = getTimelineForIntervalsWithHandle( + handle, + dataSource, + ImmutableList.of(interval) + ).lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", + dataSource, + interval, + maxVersion, + existingChunks.size() + ); + return null; + } else { + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } - // Make up a pending segment based on existing segments and pending segments in the DB. This works - // assuming that all tasks inserting segments at a particular point in time are going through the - // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). - - final SegmentIdentifier newIdentifier; - - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", - dataSource, - interval, - maxVersion, - existingChunks.size() - ); - return null; - } else { - SegmentIdentifier max = null; - - if (!existingChunks.isEmpty()) { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - for (PartitionChunk existing : existingHolder.getObject()) { - if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); + final List pendings = getPendingSegmentsForIntervalWithHandle( + handle, + dataSource, + interval + ); + + for (SegmentIdentifier pending : pendings) { + if (max == null || + pending.getVersion().compareTo(max.getVersion()) > 0 || + (pending.getVersion().equals(max.getVersion()) + && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { + max = pending; + } } - } - } - final List pendings = getPendingSegmentsForIntervalWithHandle( - handle, - dataSource, - interval - ); - - for (SegmentIdentifier pending : pendings) { - if (max == null || - pending.getVersion().compareTo(max.getVersion()) > 0 || - (pending.getVersion().equals(max.getVersion()) - && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { - max = pending; + if (max == null) { + newIdentifier = new SegmentIdentifier( + dataSource, + interval, + maxVersion, + new NumberedShardSpec(0, 0) + ); + } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + maxVersion, + max.getIdentifierAsString() + ); + return null; + } else if (max.getShardSpec() instanceof LinearShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", + dataSource, + interval, + maxVersion, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ); + return null; + } } - } - if (max == null) { - newIdentifier = new SegmentIdentifier( - dataSource, - interval, - maxVersion, - new NumberedShardSpec(0, 0) + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring sequences do not fork in two directions. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .hash() + .asBytes() ); - } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - maxVersion, - max.getIdentifierAsString() - ); - return null; - } else if (max.getShardSpec() instanceof LinearShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) - ); - } else if (max.getShardSpec() instanceof NumberedShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new NumberedShardSpec( - max.getShardSpec().getPartitionNum() + 1, - ((NumberedShardSpec) max.getShardSpec()).getPartitions() - ) - ); - } else { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", - dataSource, - interval, - maxVersion, - max.getShardSpec().getClass(), - max.getIdentifierAsString() - ); - return null; - } - } - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - - // UNIQUE key for the row, ensuring sequences do not fork in two directions. - // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines - // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) - final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( - Hashing.sha1() - .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) - .putByte((byte) 0xff) - .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) - .hash() - .asBytes() - ); - - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable() + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("id", newIdentifier.getIdentifierAsString()) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) - .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) - .execute(); - - log.info( - "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - newIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return newIdentifier; - } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + .bind("id", newIdentifier.getIdentifierAsString()) + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) + .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .execute(); + + log.info( + "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + newIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return newIdentifier; + } + }, + ALLOCATE_SEGMENT_QUIET_TRIES, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } @@ -598,9 +599,9 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact * @return true if the segment was added, false if it already existed */ private boolean announceHistoricalSegment( - final Handle handle, - final DataSegment segment, - final boolean used + final Handle handle, + final DataSegment segment, + final boolean used ) throws IOException { try { @@ -613,22 +614,22 @@ private boolean announceHistoricalSegment( // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - dbTables.getSegmentsTable() - ) + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + dbTables.getSegmentsTable() + ) ) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", new DateTime().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", used) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); } @@ -643,15 +644,15 @@ private boolean announceHistoricalSegment( private boolean segmentExists(final Handle handle, final DataSegment segment) { return !handle - .createQuery( - String.format( - "SELECT id FROM %s WHERE id = :identifier", - dbTables.getSegmentsTable() - ) - ).bind("identifier", segment.getIdentifier()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + .createQuery( + String.format( + "SELECT id FROM %s WHERE id = :identifier", + dbTables.getSegmentsTable() + ) + ).bind("identifier", segment.getIdentifier()) + .map(StringMapper.FIRST) + .list() + .isEmpty(); } /** @@ -660,10 +661,10 @@ private boolean segmentExists(final Handle handle, final DataSegment segment) public DataSourceMetadata getDataSourceMetadata(final String dataSource) { final byte[] bytes = connector.lookup( - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); if (bytes == null) { @@ -682,16 +683,16 @@ public DataSourceMetadata getDataSourceMetadata(final String dataSource) * Read dataSource metadata as bytes, from a specific handle. Returns null if there is no metadata. */ private byte[] getDataSourceMetadataWithHandleAsBytes( - final Handle handle, - final String dataSource + final Handle handle, + final String dataSource ) { return connector.lookupWithHandle( - handle, - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + handle, + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); } @@ -710,10 +711,10 @@ private byte[] getDataSourceMetadataWithHandleAsBytes( * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata */ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( - final Handle handle, - final String dataSource, - final DataSourceMetadata startMetadata, - final DataSourceMetadata endMetadata + final Handle handle, + final String dataSource, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -729,14 +730,14 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb = null; } else { oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() + Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() ); oldCommitMetadataFromDb = jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class); } final boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null - ? startMetadata.isValidStart() - : startMetadata.matches(oldCommitMetadataFromDb); + ? startMetadata.isValidStart() + : startMetadata.matches(oldCommitMetadataFromDb); if (!startMetadataMatchesExisting) { // Not in the desired start state. @@ -745,46 +746,46 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( } final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null - ? endMetadata - : oldCommitMetadataFromDb.plus(endMetadata); + ? endMetadata + : oldCommitMetadataFromDb.plus(endMetadata); final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(newCommitMetadata); final String newCommitMetadataSha1 = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() + Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); final DataSourceMetadataUpdateResult retVal; if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( - String.format( - "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " - + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", - dbTables.getDataSourceTable() - ) + String.format( + "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " + + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("commit_metadata_payload", newCommitMetadataBytes) - .bind("commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("commit_metadata_payload", newCommitMetadataBytes) + .bind("commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( - String.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", - dbTables.getDataSourceTable() - ) + String.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) - .bind("new_commit_metadata_payload", newCommitMetadataBytes) - .bind("new_commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) + .bind("new_commit_metadata_payload", newCommitMetadataBytes) + .bind("new_commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; } @@ -801,77 +802,77 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( public boolean deleteDataSourceMetadata(final String dataSource) { return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - int rows = handle.createStatement( - String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) - .bind("dataSource", dataSource) - .execute(); + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) + .bind("dataSource", dataSource) + .execute(); - return rows > 0; - } - } + return rows > 0; + } + } ); } public void updateSegmentMetadata(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - for (final DataSegment segment : segments) { - updatePayload(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for (final DataSegment segment : segments) { + updatePayload(handle, segment); + } - return null; - } - } + return null; + } + } ); } public void deleteSegments(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException - { - for (final DataSegment segment : segments) { - deleteSegment(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException + { + for (final DataSegment segment : segments) { + deleteSegment(handle, segment); + } - return null; - } - } + return null; + } + } ); } private void deleteSegment(final Handle handle, final DataSegment segment) { handle.createStatement( - String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) + String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .execute(); + .bind("id", segment.getIdentifier()) + .execute(); } private void updatePayload(final Handle handle, final DataSegment segment) throws IOException { try { handle.createStatement( - String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); } catch (IOException e) { log.error(e, "Exception inserting into DB"); @@ -879,54 +880,153 @@ private void updatePayload(final Handle handle, final DataSegment segment) throw } } - @Override - public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) - { - List matchingSegments = connector.inReadOnlyTransaction( - new TransactionCallback>() - { - @Override - public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception - { - return handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", - dbTables.getSegmentsTable() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .fold( - Lists.newArrayList(), - new Folder3, byte[]>() + public boolean deletePendingSegments(final List segments) throws IOException + { + return connector.getDBI().inTransaction( + new TransactionCallback() + { + int res = 0; + @Override + public Boolean inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException { - @Override - public List fold( - List accumulator, - byte[] payload, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); - return accumulator; - } - catch (Exception e) { - throw Throwables.propagate(e); + for (final TaskDataSegment segment : segments) { + if(deletePendingSegment(segment)) + { + res +=1; + } } - } + + return res == segments.size(); } - ); - } - } + } + ); + } + + + public boolean deletePendingSegment(final TaskDataSegment taskDataSegment) + { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format( + "DELETE from %1s WHERE sequence_name LIKE '%2s%%'", + dbTables.getPendingSegmentsTable(), + taskDataSegment.getIoConfig().get("baseSequenceName") + ) + ) + .execute(); + log.info("Delete sequenceName:"+taskDataSegment.getIoConfig().get("baseSequenceName")); + + return rows > 0; + } + } + ); + } + + + @Override + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) + { + List matchingSegments = connector.inReadOnlyTransaction( + new TransactionCallback>() + { + @Override + public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", + dbTables.getSegmentsTable() + ) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } ); log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); return matchingSegments; } + + + @Override + public List getNotActiveTask(final Interval interval) + { + List notActiveTasks = connector.inReadOnlyTransaction( + new TransactionCallback>() + { + @Override + public List inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE active = :active", + dbTables.getTasksTable() + ) + ) + .bind("active", 0) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, TaskDataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + + } + } + ); + + log.info("Found %,d tasks for interval %s.", notActiveTasks.size(), interval); + return notActiveTasks; + } + } diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 23db45847c35..fa7fc47ee9fe 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -28,6 +28,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; @@ -46,671 +47,692 @@ public class IndexerSQLMetadataStorageCoordinatorTest { - @Rule - public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); - - private final ObjectMapper mapper = new DefaultObjectMapper(); - private final DataSegment defaultSegment = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 - ); - - private final DataSegment defaultSegment2 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(1), - 9, - 100 - ); - - private final DataSegment defaultSegment3 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - NoneShardSpec.instance(), - 9, - 100 - ); - - // Overshadows defaultSegment, defaultSegment2 - private final DataSegment defaultSegment4 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "zversion", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 - ); - - private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); - private final AtomicLong metadataUpdateCounter = new AtomicLong(); - private IndexerSQLMetadataStorageCoordinator coordinator; - private TestDerbyConnector derbyConnector; - - @Before - public void setUp() - { - derbyConnector = derbyConnectorRule.getConnector(); - mapper.registerSubtypes(LinearShardSpec.class); - derbyConnector.createDataSourceTable(); - derbyConnector.createTaskTables(); - derbyConnector.createSegmentTable(); - metadataUpdateCounter.set(0); - coordinator = new IndexerSQLMetadataStorageCoordinator( - mapper, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - derbyConnector - ) + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private final ObjectMapper mapper = new DefaultObjectMapper(); + private final DataSegment defaultSegment = new DataSegment( + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + + private final DataSegment defaultSegment2 = new DataSegment( + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 100 + ); + + private final DataSegment defaultSegment3 = new DataSegment( + "fooDataSource", + Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + NoneShardSpec.instance(), + 9, + 100 + ); + + // Overshadows defaultSegment, defaultSegment2 + private final DataSegment defaultSegment4 = new DataSegment( + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + + private final TaskDataSegment defaultTaskSegment = new TaskDataSegment( + "kafka", + "12345", + "test", + "datasource_test_1", + ImmutableMap.of("baseSequenceName","test_baseSequenceName"), + Interval.parse("2015-01-01T00Z/2015-01-02T00Z") + ); + + private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); + private final AtomicLong metadataUpdateCounter = new AtomicLong(); + private IndexerSQLMetadataStorageCoordinator coordinator; + private TestDerbyConnector derbyConnector; + + @Before + public void setUp() { - @Override - protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( - Handle handle, - String dataSource, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata - ) throws IOException - { - // Count number of times this method is called. - metadataUpdateCounter.getAndIncrement(); - return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); - } - }; - } - - private void unUseSegment() - { - for (final DataSegment segment : SEGMENTS) { - Assert.assertEquals( - 1, (int) derbyConnector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) throws Exception + derbyConnector = derbyConnectorRule.getConnector(); + mapper.registerSubtypes(LinearShardSpec.class); + derbyConnector.createDataSourceTable(); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + metadataUpdateCounter.set(0); + coordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnector + ) + { + @Override + protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( + Handle handle, + String dataSource, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) throws IOException + { + // Count number of times this method is called. + metadataUpdateCounter.getAndIncrement(); + return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); + } + }; + } + + private void unUseSegment() + { + for (final DataSegment segment : SEGMENTS) { + Assert.assertEquals( + 1, (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "UPDATE %s SET used = false WHERE id = :id", + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() + ) + ).bind("id", segment.getIdentifier()).execute(); + } + } + ) + ); + } + } + + private List getUsedIdentifiers() + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + return derbyConnector.retryWithHandle( + new HandleCallback>() { - return handle.createStatement( - String.format( - "UPDATE %s SET used = false WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ) - ).bind("id", segment.getIdentifier()).execute(); + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } } - } - ) - ); + ); } - } - private List getUsedIdentifiers() - { - final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); - return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } + @Test + public void testSimpleAnnounce() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + for (DataSegment segment : SEGMENTS) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) + ); } - ); - } - - @Test - public void testSimpleAnnounce() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - for (DataSegment segment : SEGMENTS) { - Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) - ); + + Assert.assertEquals( + ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), + getUsedIdentifiers() + ); + + // Should not update dataSource metadata. + Assert.assertEquals(0, metadataUpdateCounter.get()); } - Assert.assertEquals( - ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), - getUsedIdentifiers() - ); + @Test + public void testOvershadowingAnnounce() throws IOException + { + final ImmutableSet segments = ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment4); + + coordinator.announceHistoricalSegments(segments); + + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) + ); + } - // Should not update dataSource metadata. - Assert.assertEquals(0, metadataUpdateCounter.get()); - } - - @Test - public void testOvershadowingAnnounce() throws IOException - { - final ImmutableSet segments = ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment4); - - coordinator.announceHistoricalSegments(segments); - - for (DataSegment segment : segments) { - Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) - ); + Assert.assertEquals(ImmutableList.of(defaultSegment4.getIdentifier()), getUsedIdentifiers()); } - Assert.assertEquals(ImmutableList.of(defaultSegment4.getIdentifier()), getUsedIdentifiers()); - } - - @Test - public void testTransactionalAnnounceSuccess() throws IOException - { - // Insert first segment. - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); - - Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment.getIdentifier() - ) - ); + @Test + public void testTransactionalAnnounceSuccess() throws IOException + { + // Insert first segment. + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) + ); + + // Insert second segment. + final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); + + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment2.getIdentifier() + ) + ); + + // Examine metadata. + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + coordinator.getDataSourceMetadata("fooDataSource") + ); + + // Should only be tried once per call. + Assert.assertEquals(2, metadataUpdateCounter.get()); + } - // Insert second segment. - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); - - Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment2.getIdentifier() + @Test + public void testTransactionalAnnounceRetryAndSuccess() throws IOException + { + final AtomicLong attemptCounter = new AtomicLong(); + + final IndexerSQLMetadataStorageCoordinator failOnceCoordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnector ) - ); + { + @Override + protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( + Handle handle, + String dataSource, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) throws IOException + { + metadataUpdateCounter.getAndIncrement(); + if (attemptCounter.getAndIncrement() == 0) { + return DataSourceMetadataUpdateResult.TRY_AGAIN; + } else { + return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); + } + } + }; + + // Insert first segment. + final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) + ); + + // Reset attempt counter to induce another failure. + attemptCounter.set(0); + + // Insert second segment. + final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); + + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment2.getIdentifier() + ) + ); + + // Examine metadata. + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + failOnceCoordinator.getDataSourceMetadata("fooDataSource") + ); + + // Should be tried twice per call. + Assert.assertEquals(4, metadataUpdateCounter.get()); + } - // Examine metadata. - Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "baz")), - coordinator.getDataSourceMetadata("fooDataSource") - ); + @Test + public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException + { + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1); + + // Should only be tried once. + Assert.assertEquals(1, metadataUpdateCounter.get()); + } - // Should only be tried once per call. - Assert.assertEquals(2, metadataUpdateCounter.get()); - } + @Test + public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException + { + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + + final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); + + // Should only be tried once per call. + Assert.assertEquals(2, metadataUpdateCounter.get()); + } - @Test - public void testTransactionalAnnounceRetryAndSuccess() throws IOException - { - final AtomicLong attemptCounter = new AtomicLong(); + @Test + public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException + { + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + + final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "qux")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ); + Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); + + // Should only be tried once per call. + Assert.assertEquals(2, metadataUpdateCounter.get()); + } - final IndexerSQLMetadataStorageCoordinator failOnceCoordinator = new IndexerSQLMetadataStorageCoordinator( - mapper, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - derbyConnector - ) + @Test + public void testSimpleUsedList() throws IOException { - @Override - protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( - Handle handle, - String dataSource, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata - ) throws IOException - { - metadataUpdateCounter.getAndIncrement(); - if (attemptCounter.getAndIncrement() == 0) { - return DataSourceMetadataUpdateResult.TRY_AGAIN; - } else { - return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); - } - } - }; - - // Insert first segment. - final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); - - Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment.getIdentifier() - ) - ); + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } - // Reset attempt counter to induce another failure. - attemptCounter.set(0); + @Test + public void testMultiIntervalUsedList() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); + + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval()) + ) + ) + ); + + Assert.assertEquals( + ImmutableSet.of(defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment3.getInterval()) + ) + ) + ); + + Assert.assertEquals( + ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) + ) + ) + ); + + //case to check no duplication if two intervals overlapped with the interval of same segment. + Assert.assertEquals( + ImmutableList.of(defaultSegment3), + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of( + Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), + Interval.parse("2015-01-03T09Z/2015-01-04T00Z") + ) + ) + ); + } - // Insert second segment. - final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); - - Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment2.getIdentifier() - ) - ); + @Test + public void testSimpleUnUsedList() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } - // Examine metadata. - Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "baz")), - failOnceCoordinator.getDataSourceMetadata("fooDataSource") - ); - // Should be tried twice per call. - Assert.assertEquals(4, metadataUpdateCounter.get()); - } - - @Test - public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException - { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1); - - // Should only be tried once. - Assert.assertEquals(1, metadataUpdateCounter.get()); - } - - @Test - public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException - { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + @Test + public void testUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Set actualSegments = ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive + ) + ); + Assert.assertEquals( + SEGMENTS, + actualSegments + ); + } - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); - - // Should only be tried once per call. - Assert.assertEquals(2, metadataUpdateCounter.get()); - } - - @Test - public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException - { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "qux")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) - ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); - - // Should only be tried once per call. - Assert.assertEquals(2, metadataUpdateCounter.get()); - } - - @Test - public void testSimpleUsedList() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() - ) - ) - ); - } - - @Test - public void testMultiIntervalUsedList() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); - - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval()) - ) - ) - ); + @Test + public void testUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + ) + ) + ); + } - Assert.assertEquals( - ImmutableSet.of(defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment3.getInterval()) - ) - ) - ); + @Test + public void testUsedOutOfBoundsLow() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) + ).isEmpty() + ); + } - Assert.assertEquals( - ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) - ) - ) - ); - //case to check no duplication if two intervals overlapped with the interval of same segment. - Assert.assertEquals( - ImmutableList.of(defaultSegment3), - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of( - Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), - Interval.parse("2015-01-03T09Z/2015-01-04T00Z") - ) - ) - ); - } - - @Test - public void testSimpleUnUsedList() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() - ) - ) - ); - } + @Test + public void testUsedOutOfBoundsHigh() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) + ).isEmpty() + ); + } + @Test + public void testUsedWithinBoundsEnd() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + ) + ) + ); + } - @Test - public void testUsedOverlapLow() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Set actualSegments = ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive - ) - ); - Assert.assertEquals( - SEGMENTS, - actualSegments - ); - } - - - @Test - public void testUsedOverlapHigh() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") - ) - ) - ); - } - - @Test - public void testUsedOutOfBoundsLow() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) - ).isEmpty() - ); - } - - - @Test - public void testUsedOutOfBoundsHigh() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) - ).isEmpty() - ); - } - - @Test - public void testUsedWithinBoundsEnd() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) - ) - ) - ); - } - - @Test - public void testUsedOverlapEnd() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) - ) - ) - ); - } - - - @Test - public void testUnUsedOverlapLow() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval( - defaultSegment.getInterval().getStart().minus(1), - defaultSegment.getInterval().getStart().plus(1) - ) - ).isEmpty() - ); - } - - @Test - public void testUnUsedUnderlapLow() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) - ).isEmpty() - ); - } - - - @Test - public void testUnUsedUnderlapHigh() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() - ); - } - - @Test - public void testUnUsedOverlapHigh() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() - ); - } - - @Test - public void testUnUsedBigOverlap() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2000/2999") - ) - ) - ); - } - - @Test - public void testUnUsedLowRange() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) - ) - ) - ); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) - ) - ) - ); - } - - @Test - public void testUnUsedHighRange() throws IOException - { - coordinator.announceHistoricalSegments(SEGMENTS); - unUseSegment(); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) - ) - ) - ); - Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) - ) - ) - ); - } - - @Test - public void testDeleteDataSourceMetadata() throws IOException - { - coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) - ); + @Test + public void testUsedOverlapEnd() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + ) + ) + ); + } - Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - coordinator.getDataSourceMetadata("fooDataSource") - ); - Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS")); - Assert.assertTrue("deleteValidDataSourceMetadata", coordinator.deleteDataSourceMetadata("fooDataSource")); + @Test + public void testUnUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval( + defaultSegment.getInterval().getStart().minus(1), + defaultSegment.getInterval().getStart().plus(1) + ) + ).isEmpty() + ); + } + + @Test + public void testUnUsedUnderlapLow() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + ).isEmpty() + ); + } + - Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource")); - } + @Test + public void testUnUsedUnderlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedBigOverlap() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2000/2999") + ) + ) + ); + } + + @Test + public void testUnUsedLowRange() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + ) + ) + ); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + ) + ) + ); + } + + @Test + public void testUnUsedHighRange() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + unUseSegment(); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + ) + ) + ); + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + ) + ) + ); + } + + @Test + public void testDeleteDataSourceMetadata() throws IOException + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.getDataSourceMetadata("fooDataSource") + ); + + Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS")); + Assert.assertTrue("deleteValidDataSourceMetadata", coordinator.deleteDataSourceMetadata("fooDataSource")); + + Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource")); + } + + @Test + public void testDeletePendingSegments() throws IOException + { + Assert.assertTrue("deletePendingSegment",coordinator.deletePendingSegments( + coordinator.getNotActiveTask( + defaultTaskSegment.getInterval() + ) + ) + ); + + } }