From d135fbd2b43f05526fdaf5e1c5cdbd6586c88e36 Mon Sep 17 00:00:00 2001 From: linhaoxiang Date: Wed, 30 Nov 2016 21:25:54 +0800 Subject: [PATCH 1/4] =?UTF-8?q?finished=20the=20interface=20to=20delete=20?= =?UTF-8?q?unused=20pendingSeglments,=20add=20TaskDataSegment.java=20to=20?= =?UTF-8?q?struct=20the=20task=20table=E2=80=99s=20data,=20add=20getNotAct?= =?UTF-8?q?iveTask=20and=20deletePendingSegments=20in=20IndexerSQLMetadata?= =?UTF-8?q?StorageCoordinator.java=20to=20finish=20the=20delete=20logic,?= =?UTF-8?q?=20add=20test=20in=20TestIndexerMetadataStorageCoordinator.java?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/druid/timeline/TaskDataSegment.java | 94 ++ ...TestIndexerMetadataStorageCoordinator.java | 29 +- .../IndexerMetadataStorageCoordinator.java | 30 +- .../IndexerSQLMetadataStorageCoordinator.java | 1006 +++++++++-------- ...exerSQLMetadataStorageCoordinatorTest.java | 505 +++++---- 5 files changed, 946 insertions(+), 718 deletions(-) create mode 100644 api/src/main/java/io/druid/timeline/TaskDataSegment.java diff --git a/api/src/main/java/io/druid/timeline/TaskDataSegment.java b/api/src/main/java/io/druid/timeline/TaskDataSegment.java new file mode 100644 index 000000000000..7b2ca5d8c6d9 --- /dev/null +++ b/api/src/main/java/io/druid/timeline/TaskDataSegment.java @@ -0,0 +1,94 @@ +package io.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import java.util.Map; + +/** + * Created by haoxiang on 16/11/11. + */ +public class TaskDataSegment +{ + + private final String type; + private final String id; + private final String groupId; + private final String dataSource; + private final Map ioConfig; + private final Interval interval; + + @JsonCreator + public TaskDataSegment( + @JsonProperty("type") String type, + @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("ioConfig") Map ioConfig, + @JsonProperty("interval") Interval interval + ) + { + this.type = type; + this.id = id; + this.groupId = groupId; + this.dataSource = dataSource; + this.ioConfig = ioConfig; + this.interval = interval; + } + + /** + * Get dataSource + * + * @return the dataSource + */ + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Map getIoConfig() + { + return ioConfig; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public String toString() + { + return "TaskDataSegment{" + + "type=" + type + + ", id=" + id + + ", groupId=" + groupId + + ", dataSource=" + dataSource + + '}'; + } + + +} \ No newline at end of file diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index b6c13997f452..21135ac7e2d4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -28,6 +28,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -39,10 +40,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto final private Set published = Sets.newConcurrentHashSet(); final private Set nuked = Sets.newConcurrentHashSet(); final private List unusedSegments; + final private List taskDataSegments; public TestIndexerMetadataStorageCoordinator() { unusedSegments = Lists.newArrayList(); + taskDataSegments = Lists.newArrayList(); } @Override @@ -65,7 +68,7 @@ public List getUsedSegmentsForInterval(String dataSource, Interval @Override public List getUsedSegmentsForIntervals( - String dataSource, List intervals + String dataSource, List intervals ) throws IOException { return ImmutableList.of(); @@ -93,9 +96,9 @@ public Set announceHistoricalSegments(Set segments) @Override public SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata oldCommitMetadata, - DataSourceMetadata newCommitMetadata + Set segments, + DataSourceMetadata oldCommitMetadata, + DataSourceMetadata newCommitMetadata ) throws IOException { // Don't actually compare metadata, just do it! @@ -104,11 +107,11 @@ public SegmentPublishResult announceHistoricalSegments( @Override public SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException { throw new UnsupportedOperationException(); @@ -120,6 +123,12 @@ public void deleteSegments(Set segments) nuked.addAll(segments); } + @Override + public List getNotActiveTask(final Interval interval){ + + return ImmutableList.copyOf(taskDataSegments); + } + @Override public void updateSegmentMetadata(Set segments) throws IOException { @@ -143,4 +152,4 @@ public void setUnusedSegments(List newUnusedSegments) unusedSegments.addAll(newUnusedSegments); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 808bd35506d1..5ce2ba5ae17a 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -21,6 +21,7 @@ import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -42,7 +43,7 @@ public interface IndexerMetadataStorageCoordinator * @throws IOException */ List getUsedSegmentsForInterval(String dataSource, Interval interval) - throws IOException; + throws IOException; /** * Get all segments which may include any data in the interval and are flagged as used. @@ -55,7 +56,7 @@ List getUsedSegmentsForInterval(String dataSource, Interval interva * @throws IOException */ List getUsedSegmentsForIntervals(final String dataSource, final List intervals) - throws IOException; + throws IOException; /** * Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments @@ -86,11 +87,11 @@ List getUsedSegmentsForIntervals(final String dataSource, final Lis * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException; /** @@ -113,9 +114,9 @@ SegmentIdentifier allocatePendingSegment( * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null */ SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + Set segments, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata ) throws IOException; DataSourceMetadata getDataSourceMetadata(String dataSource); @@ -141,4 +142,11 @@ SegmentPublishResult announceHistoricalSegments( * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval */ List getUnusedSegmentsForInterval(String dataSource, Interval interval); -} + + /** + * Get all task which active property is 0, is used to delete the useless segments in pendingSegment table. + * @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive + * @return The TaskDataSegment list which used to match the pendingSegment table's payload + */ + List getNotActiveTask(final Interval interval); +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2a3dc54a1487..fc5c231eb7fa 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -32,7 +32,6 @@ import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; - import io.druid.common.utils.JodaUtils; import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -44,6 +43,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; @@ -86,9 +86,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor @Inject public IndexerSQLMetadataStorageCoordinator( - ObjectMapper jsonMapper, - MetadataStorageTablesConfig dbTables, - SQLMetadataConnector connector + ObjectMapper jsonMapper, + MetadataStorageTablesConfig dbTables, + SQLMetadataConnector connector ) { this.jsonMapper = jsonMapper; @@ -106,8 +106,8 @@ public void start() @Override public List getUsedSegmentsForInterval( - final String dataSource, - final Interval interval + final String dataSource, + final Interval interval ) throws IOException { return getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval)); @@ -115,75 +115,75 @@ public List getUsedSegmentsForInterval( @Override public List getUsedSegmentsForIntervals( - final String dataSource, final List intervals + final String dataSource, final List intervals ) throws IOException { return connector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( - handle, - dataSource, - intervals - ); - - Set segments = Sets.newHashSet( - Iterables.concat( - Iterables.transform( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( + handle, + dataSource, + intervals + ); + + Set segments = Sets.newHashSet( Iterables.concat( - Iterables.transform( - intervals, - new Function>>() - { - @Override - public Iterable> apply(Interval interval) - { - return timeline.lookup(interval); - } - } - ) - ), - new Function, Iterable>() - { - @Override - public Iterable apply(TimelineObjectHolder input) - { - return input.getObject().payloads(); - } - } - ) - ) - ); + Iterables.transform( + Iterables.concat( + Iterables.transform( + intervals, + new Function>>() + { + @Override + public Iterable> apply(Interval interval) + { + return timeline.lookup(interval); + } + } + ) + ), + new Function, Iterable>() + { + @Override + public Iterable apply(TimelineObjectHolder input) + { + return input.getObject().payloads(); + } + } + ) + ) + ); - return new ArrayList<>(segments); - } - } + return new ArrayList<>(segments); + } + } ); } private List getPendingSegmentsForIntervalWithHandle( - final Handle handle, - final String dataSource, - final Interval interval + final Handle handle, + final String dataSource, + final Interval interval ) throws IOException { final List identifiers = Lists.newArrayList(); final ResultIterator dbSegments = - handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", - dbTables.getPendingSegmentsTable() + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .iterator(); + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .iterator(); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); @@ -200,9 +200,9 @@ private List getPendingSegmentsForIntervalWithHandle( } private VersionedIntervalTimeline getTimelineForIntervalsWithHandle( - final Handle handle, - final String dataSource, - final List intervals + final Handle handle, + final String dataSource, + final List intervals ) throws IOException { if (intervals == null || intervals.isEmpty()) { @@ -213,7 +213,7 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND ("); for (int i = 0; i < intervals.size(); i++) { sb.append( - "(start <= ? AND \"end\" >= ?)" + "(start <= ? AND \"end\" >= ?)" ); if (i == intervals.size() - 1) { sb.append(")"); @@ -223,33 +223,33 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi } Query> sql = handle.createQuery( - String.format( - sb.toString(), - dbTables.getSegmentsTable() - ) + String.format( + sb.toString(), + dbTables.getSegmentsTable() + ) ).bind(0, dataSource); for (int i = 0; i < intervals.size(); i++) { Interval interval = intervals.get(i); sql = sql - .bind(2 * i + 1, interval.getEnd().toString()) - .bind(2 * i + 2, interval.getStart().toString()); + .bind(2 * i + 1, interval.getEnd().toString()) + .bind(2 * i + 2, interval.getStart().toString()); } final ResultIterator dbSegments = sql - .map(ByteArrayMapper.FIRST) - .iterator(); + .map(ByteArrayMapper.FIRST) + .iterator(); final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( - Ordering.natural() + Ordering.natural() ); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class + payload, + DataSegment.class ); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); @@ -286,9 +286,9 @@ public Set announceHistoricalSegments(final Set segmen */ @Override public SegmentPublishResult announceHistoricalSegments( - final Set segments, - final DataSourceMetadata startMetadata, - final DataSourceMetadata endMetadata + final Set segments, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata ) throws IOException { if (segments.isEmpty()) { @@ -309,7 +309,7 @@ public SegmentPublishResult announceHistoricalSegments( // Find which segments are used (i.e. not overshadowed). final Set usedSegments = Sets.newHashSet(); for (TimelineObjectHolder holder : VersionedIntervalTimeline.forSegments(segments) - .lookup(JodaUtils.ETERNITY)) { + .lookup(JodaUtils.ETERNITY)) { for (PartitionChunk chunk : holder.getObject()) { usedSegments.add(chunk.getObject()); } @@ -319,42 +319,42 @@ public SegmentPublishResult announceHistoricalSegments( try { return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentPublishResult inTransaction( - final Handle handle, - final TransactionStatus transactionStatus - ) throws Exception - { - final Set inserted = Sets.newHashSet(); - - if (startMetadata != null) { - final boolean success = updateDataSourceMetadataWithHandle( - handle, - dataSource, - startMetadata, - endMetadata - ); + new TransactionCallback() + { + @Override + public SegmentPublishResult inTransaction( + final Handle handle, + final TransactionStatus transactionStatus + ) throws Exception + { + final Set inserted = Sets.newHashSet(); + + if (startMetadata != null) { + final boolean success = updateDataSourceMetadataWithHandle( + handle, + dataSource, + startMetadata, + endMetadata + ); + + if (!success) { + transactionStatus.setRollbackOnly(); + txnFailure.set(true); + throw new RuntimeException("Aborting transaction!"); + } + } - if (!success) { - transactionStatus.setRollbackOnly(); - txnFailure.set(true); - throw new RuntimeException("Aborting transaction!"); - } - } + for (final DataSegment segment : segments) { + if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { + inserted.add(segment); + } + } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); + return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); } - } - - return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); - } - }, - 3, - SQLMetadataConnector.DEFAULT_MAX_TRIES + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } catch (CallbackFailedException e) { @@ -368,11 +368,11 @@ public SegmentPublishResult inTransaction( @Override public SegmentIdentifier allocatePendingSegment( - final String dataSource, - final String sequenceName, - final String previousSegmentId, - final Interval interval, - final String maxVersion + final String dataSource, + final String sequenceName, + final String previousSegmentId, + final Interval interval, + final String maxVersion ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -383,200 +383,200 @@ public SegmentIdentifier allocatePendingSegment( final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - final List existingBytes = handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ).bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .map(ByteArrayMapper.FIRST) - .list(); - - if (!existingBytes.isEmpty()) { - final SegmentIdentifier existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdentifier.class - ); - - if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() - && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return existingIdentifier; - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull, - interval - ); + new TransactionCallback() + { + @Override + public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + final List existingBytes = handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ) + ).bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .map(ByteArrayMapper.FIRST) + .list(); + + if (!existingBytes.isEmpty()) { + final SegmentIdentifier existingIdentifier = jsonMapper.readValue( + Iterables.getOnlyElement(existingBytes), + SegmentIdentifier.class + ); + + if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() + && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return existingIdentifier; + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull, + interval + ); + + return null; + } + } - return null; - } - } + // Make up a pending segment based on existing segments and pending segments in the DB. This works + // assuming that all tasks inserting segments at a particular point in time are going through the + // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). + + final SegmentIdentifier newIdentifier; + + final List> existingChunks = getTimelineForIntervalsWithHandle( + handle, + dataSource, + ImmutableList.of(interval) + ).lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", + dataSource, + interval, + maxVersion, + existingChunks.size() + ); + return null; + } else { + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } - // Make up a pending segment based on existing segments and pending segments in the DB. This works - // assuming that all tasks inserting segments at a particular point in time are going through the - // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). - - final SegmentIdentifier newIdentifier; - - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", - dataSource, - interval, - maxVersion, - existingChunks.size() - ); - return null; - } else { - SegmentIdentifier max = null; - - if (!existingChunks.isEmpty()) { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - for (PartitionChunk existing : existingHolder.getObject()) { - if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); + final List pendings = getPendingSegmentsForIntervalWithHandle( + handle, + dataSource, + interval + ); + + for (SegmentIdentifier pending : pendings) { + if (max == null || + pending.getVersion().compareTo(max.getVersion()) > 0 || + (pending.getVersion().equals(max.getVersion()) + && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { + max = pending; + } } - } - } - final List pendings = getPendingSegmentsForIntervalWithHandle( - handle, - dataSource, - interval - ); - - for (SegmentIdentifier pending : pendings) { - if (max == null || - pending.getVersion().compareTo(max.getVersion()) > 0 || - (pending.getVersion().equals(max.getVersion()) - && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { - max = pending; + if (max == null) { + newIdentifier = new SegmentIdentifier( + dataSource, + interval, + maxVersion, + new NumberedShardSpec(0, 0) + ); + } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + maxVersion, + max.getIdentifierAsString() + ); + return null; + } else if (max.getShardSpec() instanceof LinearShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", + dataSource, + interval, + maxVersion, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ); + return null; + } } - } - if (max == null) { - newIdentifier = new SegmentIdentifier( - dataSource, - interval, - maxVersion, - new NumberedShardSpec(0, 0) + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring sequences do not fork in two directions. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .hash() + .asBytes() ); - } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - maxVersion, - max.getIdentifierAsString() - ); - return null; - } else if (max.getShardSpec() instanceof LinearShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) - ); - } else if (max.getShardSpec() instanceof NumberedShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new NumberedShardSpec( - max.getShardSpec().getPartitionNum() + 1, - ((NumberedShardSpec) max.getShardSpec()).getPartitions() - ) - ); - } else { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", - dataSource, - interval, - maxVersion, - max.getShardSpec().getClass(), - max.getIdentifierAsString() - ); - return null; - } - } - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - - // UNIQUE key for the row, ensuring sequences do not fork in two directions. - // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines - // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) - final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( - Hashing.sha1() - .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) - .putByte((byte) 0xff) - .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) - .hash() - .asBytes() - ); - - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable() + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("id", newIdentifier.getIdentifierAsString()) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) - .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) - .execute(); - - log.info( - "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - newIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return newIdentifier; - } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + .bind("id", newIdentifier.getIdentifierAsString()) + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) + .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .execute(); + + log.info( + "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + newIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return newIdentifier; + } + }, + ALLOCATE_SEGMENT_QUIET_TRIES, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } @@ -587,9 +587,9 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact * @return true if the segment was added, false if it already existed */ private boolean announceHistoricalSegment( - final Handle handle, - final DataSegment segment, - final boolean used + final Handle handle, + final DataSegment segment, + final boolean used ) throws IOException { try { @@ -602,22 +602,22 @@ private boolean announceHistoricalSegment( // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - dbTables.getSegmentsTable() - ) + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + dbTables.getSegmentsTable() + ) ) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", new DateTime().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", used) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); } @@ -632,15 +632,15 @@ private boolean announceHistoricalSegment( private boolean segmentExists(final Handle handle, final DataSegment segment) { return !handle - .createQuery( - String.format( - "SELECT id FROM %s WHERE id = :identifier", - dbTables.getSegmentsTable() - ) - ).bind("identifier", segment.getIdentifier()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + .createQuery( + String.format( + "SELECT id FROM %s WHERE id = :identifier", + dbTables.getSegmentsTable() + ) + ).bind("identifier", segment.getIdentifier()) + .map(StringMapper.FIRST) + .list() + .isEmpty(); } /** @@ -649,10 +649,10 @@ private boolean segmentExists(final Handle handle, final DataSegment segment) public DataSourceMetadata getDataSourceMetadata(final String dataSource) { final byte[] bytes = connector.lookup( - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); if (bytes == null) { @@ -671,16 +671,16 @@ public DataSourceMetadata getDataSourceMetadata(final String dataSource) * Read dataSource metadata as bytes, from a specific handle. Returns null if there is no metadata. */ private byte[] getDataSourceMetadataWithHandleAsBytes( - final Handle handle, - final String dataSource + final Handle handle, + final String dataSource ) { return connector.lookupWithHandle( - handle, - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + handle, + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); } @@ -699,10 +699,10 @@ private byte[] getDataSourceMetadataWithHandleAsBytes( * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata */ private boolean updateDataSourceMetadataWithHandle( - final Handle handle, - final String dataSource, - final DataSourceMetadata startMetadata, - final DataSourceMetadata endMetadata + final Handle handle, + final String dataSource, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -718,14 +718,14 @@ private boolean updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb = null; } else { oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() + Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() ); oldCommitMetadataFromDb = jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class); } final boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null - ? startMetadata.isValidStart() - : startMetadata.matches(oldCommitMetadataFromDb); + ? startMetadata.isValidStart() + : startMetadata.matches(oldCommitMetadataFromDb); if (!startMetadataMatchesExisting) { // Not in the desired start state. @@ -734,46 +734,46 @@ private boolean updateDataSourceMetadataWithHandle( } final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null - ? endMetadata - : oldCommitMetadataFromDb.plus(endMetadata); + ? endMetadata + : oldCommitMetadataFromDb.plus(endMetadata); final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(newCommitMetadata); final String newCommitMetadataSha1 = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() + Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); final boolean retVal; if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( - String.format( - "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " - + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", - dbTables.getDataSourceTable() - ) + String.format( + "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " + + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("commit_metadata_payload", newCommitMetadataBytes) - .bind("commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("commit_metadata_payload", newCommitMetadataBytes) + .bind("commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows > 0; } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( - String.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", - dbTables.getDataSourceTable() - ) + String.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) - .bind("new_commit_metadata_payload", newCommitMetadataBytes) - .bind("new_commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) + .bind("new_commit_metadata_payload", newCommitMetadataBytes) + .bind("new_commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows > 0; } @@ -790,77 +790,77 @@ private boolean updateDataSourceMetadataWithHandle( public boolean deleteDataSourceMetadata(final String dataSource) { return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - int rows = handle.createStatement( - String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) - .bind("dataSource", dataSource) - .execute(); + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) + .bind("dataSource", dataSource) + .execute(); - return rows > 0; - } - } + return rows > 0; + } + } ); } public void updateSegmentMetadata(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - for (final DataSegment segment : segments) { - updatePayload(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for (final DataSegment segment : segments) { + updatePayload(handle, segment); + } - return null; - } - } + return null; + } + } ); } public void deleteSegments(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException - { - for (final DataSegment segment : segments) { - deleteSegment(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException + { + for (final DataSegment segment : segments) { + deleteSegment(handle, segment); + } - return null; - } - } + return null; + } + } ); } private void deleteSegment(final Handle handle, final DataSegment segment) { handle.createStatement( - String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) + String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .execute(); + .bind("id", segment.getIdentifier()) + .execute(); } private void updatePayload(final Handle handle, final DataSegment segment) throws IOException { try { handle.createStatement( - String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); } catch (IOException e) { log.error(e, "Exception inserting into DB"); @@ -868,54 +868,148 @@ private void updatePayload(final Handle handle, final DataSegment segment) throw } } + public boolean deletePendingSegments(final List segments) throws IOException + { + return connector.getDBI().inTransaction( + new TransactionCallback() + { + int res = 0; + @Override + public Boolean inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException + { + for (final TaskDataSegment segment : segments) { + if(deletePendingSegment(segment)) + { + res +=1; + } + } + + return res == segments.size(); + } + } + ); + } + + + public boolean deletePendingSegment(final TaskDataSegment taskDataSegment) + { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE sequence_name = :sequence_name", dbTables.getPendingSegmentsTable()) + ) + .bind("sequence_name", taskDataSegment.getIoConfig().get("baseSequenceName")) + .execute(); + log.info("HHHHHHHHHHHHHHHHHHHHHHHHHH:"+taskDataSegment.getIoConfig().get("baseSequenceName")); + + return rows > 0; + } + } + ); + } + @Override public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = connector.inReadOnlyTransaction( - new TransactionCallback>() - { - @Override - public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception - { - return handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", - dbTables.getSegmentsTable() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .fold( - Lists.newArrayList(), - new Folder3, byte[]>() - { - @Override - public List fold( - List accumulator, - byte[] payload, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); - return accumulator; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); - } - } + new TransactionCallback>() + { + @Override + public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", + dbTables.getSegmentsTable() + ) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } ); log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); return matchingSegments; } -} + + @Override + public List getNotActiveTask(final Interval interval) + { + List notActiveTasks = connector.inReadOnlyTransaction( + new TransactionCallback>() + { + @Override + public List inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE active = :active", + dbTables.getTasksTable() + ) + ) + .bind("active", 0) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, TaskDataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + + } + } + ); + + log.info("Found %,d tasks for interval %s.", notActiveTasks.size(), interval); + return notActiveTasks; + } + +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5f4b0da759a7..35f2f3468350 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -27,6 +27,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; @@ -49,52 +50,61 @@ public class IndexerSQLMetadataStorageCoordinatorTest private final ObjectMapper mapper = new DefaultObjectMapper(); private final DataSegment defaultSegment = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 ); private final DataSegment defaultSegment2 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(1), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 100 + ); + + private final TaskDataSegment defaultTaskSegment = new TaskDataSegment( + "kafka", + "12345", + "test", + "datasource_test_1", + ImmutableMap.of("baseSequenceName","test_baseSequenceName"), + Interval.parse("2015-01-01T00Z/2015-01-02T00Z") ); private final DataSegment defaultSegment3 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - NoneShardSpec.instance(), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + NoneShardSpec.instance(), + 9, + 100 ); // Overshadows defaultSegment, defaultSegment2 private final DataSegment defaultSegment4 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "zversion", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 ); private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); @@ -109,10 +119,11 @@ public void setUp() derbyConnector.createDataSourceTable(); derbyConnector.createTaskTables(); derbyConnector.createSegmentTable(); + derbyConnector.createPendingSegmentsTable(); coordinator = new IndexerSQLMetadataStorageCoordinator( - mapper, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - derbyConnector + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnector ); } @@ -120,21 +131,21 @@ private void unUseSegment() { for (final DataSegment segment : SEGMENTS) { Assert.assertEquals( - 1, (int) derbyConnector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) throws Exception - { - return handle.createStatement( - String.format( - "UPDATE %s SET used = false WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ) - ).bind("id", segment.getIdentifier()).execute(); - } - } - ) + 1, (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "UPDATE %s SET used = false WHERE id = :id", + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() + ) + ).bind("id", segment.getIdentifier()).execute(); + } + } + ) ); } } @@ -143,16 +154,16 @@ private List getUsedIdentifiers() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } - } + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } + } ); } @@ -162,19 +173,19 @@ public void testSimpleAnnounce() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); for (DataSegment segment : SEGMENTS) { Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) ); } Assert.assertEquals( - ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), - getUsedIdentifiers() + ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), + getUsedIdentifiers() ); } @@ -187,13 +198,13 @@ public void testOvershadowingAnnounce() throws IOException for (DataSegment segment : segments) { Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) ); } @@ -205,44 +216,44 @@ public void testTransactionalAnnounceSuccess() throws IOException { // Insert first segment. final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment.getIdentifier() - ) + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) ); // Insert second segment. final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment2.getIdentifier() - ) + mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment2.getIdentifier() + ) ); // Examine metadata. Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "baz")), - coordinator.getDataSourceMetadata("fooDataSource") + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + coordinator.getDataSourceMetadata("fooDataSource") ); } @@ -250,9 +261,9 @@ public void testTransactionalAnnounceSuccess() throws IOException public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1); } @@ -261,16 +272,16 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); } @@ -279,16 +290,16 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "qux")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "qux")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); } @@ -298,13 +309,13 @@ public void testSimpleUsedList() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) ) - ) ); } @@ -315,45 +326,45 @@ public void testMultiIntervalUsedList() throws IOException coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval()) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval()) + ) ) - ) ); Assert.assertEquals( - ImmutableSet.of(defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment3.getInterval()) + ImmutableSet.of(defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment3.getInterval()) + ) ) - ) ); Assert.assertEquals( - ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) + ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) + ) ) - ) ); //case to check no duplication if two intervals overlapped with the interval of same segment. Assert.assertEquals( - ImmutableList.of(defaultSegment3), - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of( - Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), - Interval.parse("2015-01-03T09Z/2015-01-04T00Z") + ImmutableList.of(defaultSegment3), + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of( + Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), + Interval.parse("2015-01-03T09Z/2015-01-04T00Z") + ) ) - ) ); } @@ -363,13 +374,13 @@ public void testSimpleUnUsedList() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) ) - ) ); } @@ -379,14 +390,14 @@ public void testUsedOverlapLow() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Set actualSegments = ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive - ) + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive + ) ); Assert.assertEquals( - SEGMENTS, - actualSegments + SEGMENTS, + actualSegments ); } @@ -396,13 +407,13 @@ public void testUsedOverlapHigh() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + ) ) - ) ); } @@ -411,10 +422,10 @@ public void testUsedOutOfBoundsLow() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) - ).isEmpty() + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) + ).isEmpty() ); } @@ -424,10 +435,10 @@ public void testUsedOutOfBoundsHigh() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) - ).isEmpty() + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) + ).isEmpty() ); } @@ -436,13 +447,13 @@ public void testUsedWithinBoundsEnd() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + ) ) - ) ); } @@ -451,13 +462,13 @@ public void testUsedOverlapEnd() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + ) ) - ) ); } @@ -468,13 +479,13 @@ public void testUnUsedOverlapLow() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval( - defaultSegment.getInterval().getStart().minus(1), - defaultSegment.getInterval().getStart().plus(1) - ) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval( + defaultSegment.getInterval().getStart().minus(1), + defaultSegment.getInterval().getStart().plus(1) + ) + ).isEmpty() ); } @@ -484,10 +495,10 @@ public void testUnUsedUnderlapLow() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + ).isEmpty() ); } @@ -498,10 +509,10 @@ public void testUnUsedUnderlapHigh() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() ); } @@ -511,10 +522,10 @@ public void testUnUsedOverlapHigh() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() ); } @@ -524,13 +535,13 @@ public void testUnUsedBigOverlap() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2000/2999") + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2000/2999") + ) ) - ) ); } @@ -540,22 +551,22 @@ public void testUnUsedLowRange() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + ) ) - ) ); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + ) ) - ) ); } @@ -565,22 +576,22 @@ public void testUnUsedHighRange() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + ) ) - ) ); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + ) ) - ) ); } @@ -588,14 +599,14 @@ public void testUnUsedHighRange() throws IOException public void testDeleteDataSourceMetadata() throws IOException { coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - coordinator.getDataSourceMetadata("fooDataSource") + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.getDataSourceMetadata("fooDataSource") ); Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS")); @@ -603,4 +614,16 @@ public void testDeleteDataSourceMetadata() throws IOException Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource")); } -} + + @Test + public void testDeletePendingSegments() throws IOException + { + Assert.assertTrue("deletePendingSegment",coordinator.deletePendingSegments( + coordinator.getNotActiveTask( + defaultTaskSegment.getInterval() + ) + ) + ); + + } +} \ No newline at end of file From 18eacabb7f03e5a7bf1b09c48b33bbe3c2b4669d Mon Sep 17 00:00:00 2001 From: linhaoxiang Date: Sat, 7 Jan 2017 16:28:39 +0800 Subject: [PATCH 2/4] =?UTF-8?q?change=20IndexerSQLMetadataStorageCoordinat?= =?UTF-8?q?or=E2=80=99s=20method=20deletePendingSegment=E2=80=99s=20sql=20?= =?UTF-8?q?->=20=E2=80=9CDELETE=20from=20%1s=20WHERE=20sequence=5Fname=20L?= =?UTF-8?q?IKE=20'%2s%%=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit and format other code --- .../main/java/io/druid/timeline/TaskDataSegment.java | 2 +- .../test/TestIndexerMetadataStorageCoordinator.java | 2 +- .../overlord/IndexerMetadataStorageCoordinator.java | 2 +- .../IndexerSQLMetadataStorageCoordinator.java | 11 +++++++---- .../IndexerSQLMetadataStorageCoordinatorTest.java | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/druid/timeline/TaskDataSegment.java b/api/src/main/java/io/druid/timeline/TaskDataSegment.java index 7b2ca5d8c6d9..23bee63cdafb 100644 --- a/api/src/main/java/io/druid/timeline/TaskDataSegment.java +++ b/api/src/main/java/io/druid/timeline/TaskDataSegment.java @@ -91,4 +91,4 @@ public String toString() } -} \ No newline at end of file +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 21135ac7e2d4..a54e3d596b61 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -152,4 +152,4 @@ public void setUnusedSegments(List newUnusedSegments) unusedSegments.addAll(newUnusedSegments); } } -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 5ce2ba5ae17a..4a683b039277 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -149,4 +149,4 @@ SegmentPublishResult announceHistoricalSegments( * @return The TaskDataSegment list which used to match the pendingSegment table's payload */ List getNotActiveTask(final Interval interval); -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index fc5c231eb7fa..15f5a8e96e78 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -900,11 +900,14 @@ public boolean deletePendingSegment(final TaskDataSegment taskDataSegment) public Boolean withHandle(Handle handle) throws Exception { int rows = handle.createStatement( - String.format("DELETE from %s WHERE sequence_name = :sequence_name", dbTables.getPendingSegmentsTable()) + String.format( + "DELETE from %1s WHERE sequence_name LIKE '%2s%%'", + dbTables.getPendingSegmentsTable(), + taskDataSegment.getIoConfig().get("baseSequenceName") + ) ) - .bind("sequence_name", taskDataSegment.getIoConfig().get("baseSequenceName")) .execute(); - log.info("HHHHHHHHHHHHHHHHHHHHHHHHHH:"+taskDataSegment.getIoConfig().get("baseSequenceName")); + log.info("Delete sequenceName:"+taskDataSegment.getIoConfig().get("baseSequenceName")); return rows > 0; } @@ -1012,4 +1015,4 @@ public List fold( return notActiveTasks; } -} \ No newline at end of file +} diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 35f2f3468350..cb0ccb872486 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -626,4 +626,4 @@ public void testDeletePendingSegments() throws IOException ); } -} \ No newline at end of file +} From e94a9d3ee05756d4f36b9ad3743b11c7af3f1ad0 Mon Sep 17 00:00:00 2001 From: linhaoxiang Date: Wed, 30 Nov 2016 21:25:54 +0800 Subject: [PATCH 3/4] =?UTF-8?q?finished=20the=20interface=20to=20delete=20?= =?UTF-8?q?unused=20pendingSeglments,=20add=20TaskDataSegment.java=20to=20?= =?UTF-8?q?struct=20the=20task=20table=E2=80=99s=20data,=20add=20getNotAct?= =?UTF-8?q?iveTask=20and=20deletePendingSegments=20in=20IndexerSQLMetadata?= =?UTF-8?q?StorageCoordinator.java=20to=20finish=20the=20delete=20logic,?= =?UTF-8?q?=20add=20test=20in=20TestIndexerMetadataStorageCoordinator.java?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/druid/timeline/TaskDataSegment.java | 94 ++ ...TestIndexerMetadataStorageCoordinator.java | 29 +- .../IndexerMetadataStorageCoordinator.java | 30 +- .../IndexerSQLMetadataStorageCoordinator.java | 986 ++++++++++-------- ...exerSQLMetadataStorageCoordinatorTest.java | 507 ++++----- 5 files changed, 959 insertions(+), 687 deletions(-) create mode 100644 api/src/main/java/io/druid/timeline/TaskDataSegment.java diff --git a/api/src/main/java/io/druid/timeline/TaskDataSegment.java b/api/src/main/java/io/druid/timeline/TaskDataSegment.java new file mode 100644 index 000000000000..7b2ca5d8c6d9 --- /dev/null +++ b/api/src/main/java/io/druid/timeline/TaskDataSegment.java @@ -0,0 +1,94 @@ +package io.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import java.util.Map; + +/** + * Created by haoxiang on 16/11/11. + */ +public class TaskDataSegment +{ + + private final String type; + private final String id; + private final String groupId; + private final String dataSource; + private final Map ioConfig; + private final Interval interval; + + @JsonCreator + public TaskDataSegment( + @JsonProperty("type") String type, + @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("ioConfig") Map ioConfig, + @JsonProperty("interval") Interval interval + ) + { + this.type = type; + this.id = id; + this.groupId = groupId; + this.dataSource = dataSource; + this.ioConfig = ioConfig; + this.interval = interval; + } + + /** + * Get dataSource + * + * @return the dataSource + */ + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Map getIoConfig() + { + return ioConfig; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public String toString() + { + return "TaskDataSegment{" + + "type=" + type + + ", id=" + id + + ", groupId=" + groupId + + ", dataSource=" + dataSource + + '}'; + } + + +} \ No newline at end of file diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index b6c13997f452..21135ac7e2d4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -28,6 +28,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -39,10 +40,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto final private Set published = Sets.newConcurrentHashSet(); final private Set nuked = Sets.newConcurrentHashSet(); final private List unusedSegments; + final private List taskDataSegments; public TestIndexerMetadataStorageCoordinator() { unusedSegments = Lists.newArrayList(); + taskDataSegments = Lists.newArrayList(); } @Override @@ -65,7 +68,7 @@ public List getUsedSegmentsForInterval(String dataSource, Interval @Override public List getUsedSegmentsForIntervals( - String dataSource, List intervals + String dataSource, List intervals ) throws IOException { return ImmutableList.of(); @@ -93,9 +96,9 @@ public Set announceHistoricalSegments(Set segments) @Override public SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata oldCommitMetadata, - DataSourceMetadata newCommitMetadata + Set segments, + DataSourceMetadata oldCommitMetadata, + DataSourceMetadata newCommitMetadata ) throws IOException { // Don't actually compare metadata, just do it! @@ -104,11 +107,11 @@ public SegmentPublishResult announceHistoricalSegments( @Override public SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException { throw new UnsupportedOperationException(); @@ -120,6 +123,12 @@ public void deleteSegments(Set segments) nuked.addAll(segments); } + @Override + public List getNotActiveTask(final Interval interval){ + + return ImmutableList.copyOf(taskDataSegments); + } + @Override public void updateSegmentMetadata(Set segments) throws IOException { @@ -143,4 +152,4 @@ public void setUnusedSegments(List newUnusedSegments) unusedSegments.addAll(newUnusedSegments); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 808bd35506d1..5ce2ba5ae17a 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -21,6 +21,7 @@ import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import org.joda.time.Interval; import java.io.IOException; @@ -42,7 +43,7 @@ public interface IndexerMetadataStorageCoordinator * @throws IOException */ List getUsedSegmentsForInterval(String dataSource, Interval interval) - throws IOException; + throws IOException; /** * Get all segments which may include any data in the interval and are flagged as used. @@ -55,7 +56,7 @@ List getUsedSegmentsForInterval(String dataSource, Interval interva * @throws IOException */ List getUsedSegmentsForIntervals(final String dataSource, final List intervals) - throws IOException; + throws IOException; /** * Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments @@ -86,11 +87,11 @@ List getUsedSegmentsForIntervals(final String dataSource, final Lis * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdentifier allocatePendingSegment( - String dataSource, - String sequenceName, - String previousSegmentId, - Interval interval, - String maxVersion + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion ) throws IOException; /** @@ -113,9 +114,9 @@ SegmentIdentifier allocatePendingSegment( * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null */ SegmentPublishResult announceHistoricalSegments( - Set segments, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + Set segments, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata ) throws IOException; DataSourceMetadata getDataSourceMetadata(String dataSource); @@ -141,4 +142,11 @@ SegmentPublishResult announceHistoricalSegments( * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval */ List getUnusedSegmentsForInterval(String dataSource, Interval interval); -} + + /** + * Get all task which active property is 0, is used to delete the useless segments in pendingSegment table. + * @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive + * @return The TaskDataSegment list which used to match the pendingSegment table's payload + */ + List getNotActiveTask(final Interval interval); +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 9d8cf4d2fc56..8334ed55005d 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -43,6 +43,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; @@ -85,9 +86,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor @Inject public IndexerSQLMetadataStorageCoordinator( - ObjectMapper jsonMapper, - MetadataStorageTablesConfig dbTables, - SQLMetadataConnector connector + ObjectMapper jsonMapper, + MetadataStorageTablesConfig dbTables, + SQLMetadataConnector connector ) { this.jsonMapper = jsonMapper; @@ -112,8 +113,8 @@ public void start() @Override public List getUsedSegmentsForInterval( - final String dataSource, - final Interval interval + final String dataSource, + final Interval interval ) throws IOException { return getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval)); @@ -121,75 +122,75 @@ public List getUsedSegmentsForInterval( @Override public List getUsedSegmentsForIntervals( - final String dataSource, final List intervals + final String dataSource, final List intervals ) throws IOException { return connector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( - handle, - dataSource, - intervals - ); - - Set segments = Sets.newHashSet( - Iterables.concat( - Iterables.transform( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( + handle, + dataSource, + intervals + ); + + Set segments = Sets.newHashSet( Iterables.concat( - Iterables.transform( - intervals, - new Function>>() - { - @Override - public Iterable> apply(Interval interval) - { - return timeline.lookup(interval); - } - } - ) - ), - new Function, Iterable>() - { - @Override - public Iterable apply(TimelineObjectHolder input) - { - return input.getObject().payloads(); - } - } - ) - ) - ); + Iterables.transform( + Iterables.concat( + Iterables.transform( + intervals, + new Function>>() + { + @Override + public Iterable> apply(Interval interval) + { + return timeline.lookup(interval); + } + } + ) + ), + new Function, Iterable>() + { + @Override + public Iterable apply(TimelineObjectHolder input) + { + return input.getObject().payloads(); + } + } + ) + ) + ); - return new ArrayList<>(segments); - } - } + return new ArrayList<>(segments); + } + } ); } private List getPendingSegmentsForIntervalWithHandle( - final Handle handle, - final String dataSource, - final Interval interval + final Handle handle, + final String dataSource, + final Interval interval ) throws IOException { final List identifiers = Lists.newArrayList(); final ResultIterator dbSegments = - handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", - dbTables.getPendingSegmentsTable() + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .iterator(); + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .iterator(); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); @@ -206,9 +207,9 @@ private List getPendingSegmentsForIntervalWithHandle( } private VersionedIntervalTimeline getTimelineForIntervalsWithHandle( - final Handle handle, - final String dataSource, - final List intervals + final Handle handle, + final String dataSource, + final List intervals ) throws IOException { if (intervals == null || intervals.isEmpty()) { @@ -219,7 +220,7 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND ("); for (int i = 0; i < intervals.size(); i++) { sb.append( - "(start <= ? AND \"end\" >= ?)" + "(start <= ? AND \"end\" >= ?)" ); if (i == intervals.size() - 1) { sb.append(")"); @@ -229,33 +230,33 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi } Query> sql = handle.createQuery( - String.format( - sb.toString(), - dbTables.getSegmentsTable() - ) + String.format( + sb.toString(), + dbTables.getSegmentsTable() + ) ).bind(0, dataSource); for (int i = 0; i < intervals.size(); i++) { Interval interval = intervals.get(i); sql = sql - .bind(2 * i + 1, interval.getEnd().toString()) - .bind(2 * i + 2, interval.getStart().toString()); + .bind(2 * i + 1, interval.getEnd().toString()) + .bind(2 * i + 2, interval.getStart().toString()); } final ResultIterator dbSegments = sql - .map(ByteArrayMapper.FIRST) - .iterator(); + .map(ByteArrayMapper.FIRST) + .iterator(); final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( - Ordering.natural() + Ordering.natural() ); while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class + payload, + DataSegment.class ); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); @@ -292,9 +293,9 @@ public Set announceHistoricalSegments(final Set segmen */ @Override public SegmentPublishResult announceHistoricalSegments( - final Set segments, - final DataSourceMetadata startMetadata, - final DataSourceMetadata endMetadata + final Set segments, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata ) throws IOException { if (segments.isEmpty()) { @@ -315,7 +316,7 @@ public SegmentPublishResult announceHistoricalSegments( // Find which segments are used (i.e. not overshadowed). final Set usedSegments = Sets.newHashSet(); for (TimelineObjectHolder holder : VersionedIntervalTimeline.forSegments(segments) - .lookup(JodaUtils.ETERNITY)) { + .lookup(JodaUtils.ETERNITY)) { for (PartitionChunk chunk : holder.getObject()) { usedSegments.add(chunk.getObject()); } @@ -325,6 +326,7 @@ public SegmentPublishResult announceHistoricalSegments( try { return connector.retryTransaction( +<<<<<<< HEAD new TransactionCallback() { @Override @@ -354,18 +356,44 @@ public SegmentPublishResult inTransaction( } } } +======= + new TransactionCallback() + { + @Override + public SegmentPublishResult inTransaction( + final Handle handle, + final TransactionStatus transactionStatus + ) throws Exception + { + final Set inserted = Sets.newHashSet(); + + if (startMetadata != null) { + final boolean success = updateDataSourceMetadataWithHandle( + handle, + dataSource, + startMetadata, + endMetadata + ); + + if (!success) { + transactionStatus.setRollbackOnly(); + txnFailure.set(true); + throw new RuntimeException("Aborting transaction!"); + } + } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); - } - } + for (final DataSegment segment : segments) { + if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { + inserted.add(segment); + } + } +>>>>>>> d135fbd... finished the interface to delete unused pendingSeglments, add TaskDataSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java. - return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); - } - }, - 3, - SQLMetadataConnector.DEFAULT_MAX_TRIES + return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); + } + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } catch (CallbackFailedException e) { @@ -379,11 +407,11 @@ public SegmentPublishResult inTransaction( @Override public SegmentIdentifier allocatePendingSegment( - final String dataSource, - final String sequenceName, - final String previousSegmentId, - final Interval interval, - final String maxVersion + final String dataSource, + final String sequenceName, + final String previousSegmentId, + final Interval interval, + final String maxVersion ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -394,200 +422,200 @@ public SegmentIdentifier allocatePendingSegment( final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - final List existingBytes = handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ).bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .map(ByteArrayMapper.FIRST) - .list(); - - if (!existingBytes.isEmpty()) { - final SegmentIdentifier existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdentifier.class - ); - - if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() - && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return existingIdentifier; - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull, - interval - ); + new TransactionCallback() + { + @Override + public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + final List existingBytes = handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ) + ).bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .map(ByteArrayMapper.FIRST) + .list(); + + if (!existingBytes.isEmpty()) { + final SegmentIdentifier existingIdentifier = jsonMapper.readValue( + Iterables.getOnlyElement(existingBytes), + SegmentIdentifier.class + ); + + if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() + && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return existingIdentifier; + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull, + interval + ); + + return null; + } + } - return null; - } - } + // Make up a pending segment based on existing segments and pending segments in the DB. This works + // assuming that all tasks inserting segments at a particular point in time are going through the + // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). + + final SegmentIdentifier newIdentifier; + + final List> existingChunks = getTimelineForIntervalsWithHandle( + handle, + dataSource, + ImmutableList.of(interval) + ).lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", + dataSource, + interval, + maxVersion, + existingChunks.size() + ); + return null; + } else { + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } - // Make up a pending segment based on existing segments and pending segments in the DB. This works - // assuming that all tasks inserting segments at a particular point in time are going through the - // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). - - final SegmentIdentifier newIdentifier; - - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", - dataSource, - interval, - maxVersion, - existingChunks.size() - ); - return null; - } else { - SegmentIdentifier max = null; - - if (!existingChunks.isEmpty()) { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - for (PartitionChunk existing : existingHolder.getObject()) { - if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); + final List pendings = getPendingSegmentsForIntervalWithHandle( + handle, + dataSource, + interval + ); + + for (SegmentIdentifier pending : pendings) { + if (max == null || + pending.getVersion().compareTo(max.getVersion()) > 0 || + (pending.getVersion().equals(max.getVersion()) + && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { + max = pending; + } } - } - } - final List pendings = getPendingSegmentsForIntervalWithHandle( - handle, - dataSource, - interval - ); - - for (SegmentIdentifier pending : pendings) { - if (max == null || - pending.getVersion().compareTo(max.getVersion()) > 0 || - (pending.getVersion().equals(max.getVersion()) - && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { - max = pending; + if (max == null) { + newIdentifier = new SegmentIdentifier( + dataSource, + interval, + maxVersion, + new NumberedShardSpec(0, 0) + ); + } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + maxVersion, + max.getIdentifierAsString() + ); + return null; + } else if (max.getShardSpec() instanceof LinearShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + newIdentifier = new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", + dataSource, + interval, + maxVersion, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ); + return null; + } } - } - if (max == null) { - newIdentifier = new SegmentIdentifier( - dataSource, - interval, - maxVersion, - new NumberedShardSpec(0, 0) + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring sequences do not fork in two directions. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .hash() + .asBytes() ); - } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - maxVersion, - max.getIdentifierAsString() - ); - return null; - } else if (max.getShardSpec() instanceof LinearShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) - ); - } else if (max.getShardSpec() instanceof NumberedShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new NumberedShardSpec( - max.getShardSpec().getPartitionNum() + 1, - ((NumberedShardSpec) max.getShardSpec()).getPartitions() - ) - ); - } else { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", - dataSource, - interval, - maxVersion, - max.getShardSpec().getClass(), - max.getIdentifierAsString() - ); - return null; - } - } - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - - // UNIQUE key for the row, ensuring sequences do not fork in two directions. - // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines - // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) - final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( - Hashing.sha1() - .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) - .putByte((byte) 0xff) - .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) - .hash() - .asBytes() - ); - - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable() + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable() + ) ) - ) - .bind("id", newIdentifier.getIdentifierAsString()) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) - .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) - .execute(); - - log.info( - "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - newIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return newIdentifier; - } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + .bind("id", newIdentifier.getIdentifierAsString()) + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull) + .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) + .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .execute(); + + log.info( + "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + newIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentIdNotNull + ); + + return newIdentifier; + } + }, + ALLOCATE_SEGMENT_QUIET_TRIES, + SQLMetadataConnector.DEFAULT_MAX_TRIES ); } @@ -598,9 +626,9 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact * @return true if the segment was added, false if it already existed */ private boolean announceHistoricalSegment( - final Handle handle, - final DataSegment segment, - final boolean used + final Handle handle, + final DataSegment segment, + final boolean used ) throws IOException { try { @@ -613,22 +641,22 @@ private boolean announceHistoricalSegment( // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - dbTables.getSegmentsTable() - ) + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + dbTables.getSegmentsTable() + ) ) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", new DateTime().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", used) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); } @@ -643,15 +671,15 @@ private boolean announceHistoricalSegment( private boolean segmentExists(final Handle handle, final DataSegment segment) { return !handle - .createQuery( - String.format( - "SELECT id FROM %s WHERE id = :identifier", - dbTables.getSegmentsTable() - ) - ).bind("identifier", segment.getIdentifier()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + .createQuery( + String.format( + "SELECT id FROM %s WHERE id = :identifier", + dbTables.getSegmentsTable() + ) + ).bind("identifier", segment.getIdentifier()) + .map(StringMapper.FIRST) + .list() + .isEmpty(); } /** @@ -660,10 +688,10 @@ private boolean segmentExists(final Handle handle, final DataSegment segment) public DataSourceMetadata getDataSourceMetadata(final String dataSource) { final byte[] bytes = connector.lookup( - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); if (bytes == null) { @@ -682,16 +710,16 @@ public DataSourceMetadata getDataSourceMetadata(final String dataSource) * Read dataSource metadata as bytes, from a specific handle. Returns null if there is no metadata. */ private byte[] getDataSourceMetadataWithHandleAsBytes( - final Handle handle, - final String dataSource + final Handle handle, + final String dataSource ) { return connector.lookupWithHandle( - handle, - dbTables.getDataSourceTable(), - "dataSource", - "commit_metadata_payload", - dataSource + handle, + dbTables.getDataSourceTable(), + "dataSource", + "commit_metadata_payload", + dataSource ); } @@ -709,11 +737,19 @@ private byte[] getDataSourceMetadataWithHandleAsBytes( * * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata */ +<<<<<<< HEAD protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( final Handle handle, final String dataSource, final DataSourceMetadata startMetadata, final DataSourceMetadata endMetadata +======= + private boolean updateDataSourceMetadataWithHandle( + final Handle handle, + final String dataSource, + final DataSourceMetadata startMetadata, + final DataSourceMetadata endMetadata +>>>>>>> d135fbd... finished the interface to delete unused pendingSeglments, add TaskDataSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java. ) throws IOException { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -729,14 +765,14 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb = null; } else { oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() + Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes() ); oldCommitMetadataFromDb = jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class); } final boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null - ? startMetadata.isValidStart() - : startMetadata.matches(oldCommitMetadataFromDb); + ? startMetadata.isValidStart() + : startMetadata.matches(oldCommitMetadataFromDb); if (!startMetadataMatchesExisting) { // Not in the desired start state. @@ -745,46 +781,46 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( } final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null - ? endMetadata - : oldCommitMetadataFromDb.plus(endMetadata); + ? endMetadata + : oldCommitMetadataFromDb.plus(endMetadata); final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(newCommitMetadata); final String newCommitMetadataSha1 = BaseEncoding.base16().encode( - Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() + Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); final DataSourceMetadataUpdateResult retVal; if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( - String.format( - "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " - + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", - dbTables.getDataSourceTable() - ) + String.format( + "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " + + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("created_date", new DateTime().toString()) - .bind("commit_metadata_payload", newCommitMetadataBytes) - .bind("commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("created_date", new DateTime().toString()) + .bind("commit_metadata_payload", newCommitMetadataBytes) + .bind("commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( - String.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", - dbTables.getDataSourceTable() - ) + String.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", + dbTables.getDataSourceTable() + ) ) - .bind("dataSource", dataSource) - .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) - .bind("new_commit_metadata_payload", newCommitMetadataBytes) - .bind("new_commit_metadata_sha1", newCommitMetadataSha1) - .execute(); + .bind("dataSource", dataSource) + .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) + .bind("new_commit_metadata_payload", newCommitMetadataBytes) + .bind("new_commit_metadata_sha1", newCommitMetadataSha1) + .execute(); retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; } @@ -801,77 +837,77 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( public boolean deleteDataSourceMetadata(final String dataSource) { return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - int rows = handle.createStatement( - String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) - .bind("dataSource", dataSource) - .execute(); + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) + .bind("dataSource", dataSource) + .execute(); - return rows > 0; - } - } + return rows > 0; + } + } ); } public void updateSegmentMetadata(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - for (final DataSegment segment : segments) { - updatePayload(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for (final DataSegment segment : segments) { + updatePayload(handle, segment); + } - return null; - } - } + return null; + } + } ); } public void deleteSegments(final Set segments) throws IOException { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException - { - for (final DataSegment segment : segments) { - deleteSegment(handle, segment); - } + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException + { + for (final DataSegment segment : segments) { + deleteSegment(handle, segment); + } - return null; - } - } + return null; + } + } ); } private void deleteSegment(final Handle handle, final DataSegment segment) { handle.createStatement( - String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) + String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .execute(); + .bind("id", segment.getIdentifier()) + .execute(); } private void updatePayload(final Handle handle, final DataSegment segment) throws IOException { try { handle.createStatement( - String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); } catch (IOException e) { log.error(e, "Exception inserting into DB"); @@ -879,54 +915,148 @@ private void updatePayload(final Handle handle, final DataSegment segment) throw } } + public boolean deletePendingSegments(final List segments) throws IOException + { + return connector.getDBI().inTransaction( + new TransactionCallback() + { + int res = 0; + @Override + public Boolean inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException + { + for (final TaskDataSegment segment : segments) { + if(deletePendingSegment(segment)) + { + res +=1; + } + } + + return res == segments.size(); + } + } + ); + } + + + public boolean deletePendingSegment(final TaskDataSegment taskDataSegment) + { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE sequence_name = :sequence_name", dbTables.getPendingSegmentsTable()) + ) + .bind("sequence_name", taskDataSegment.getIoConfig().get("baseSequenceName")) + .execute(); + log.info("HHHHHHHHHHHHHHHHHHHHHHHHHH:"+taskDataSegment.getIoConfig().get("baseSequenceName")); + + return rows > 0; + } + } + ); + } + @Override public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = connector.inReadOnlyTransaction( - new TransactionCallback>() - { - @Override - public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception - { - return handle - .createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", - dbTables.getSegmentsTable() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .fold( - Lists.newArrayList(), - new Folder3, byte[]>() - { - @Override - public List fold( - List accumulator, - byte[] payload, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); - return accumulator; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); - } - } + new TransactionCallback>() + { + @Override + public List inTransaction(final Handle handle, final TransactionStatus status) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false", + dbTables.getSegmentsTable() + ) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, DataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } ); log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); return matchingSegments; } -} + + @Override + public List getNotActiveTask(final Interval interval) + { + List notActiveTasks = connector.inReadOnlyTransaction( + new TransactionCallback>() + { + @Override + public List inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE active = :active", + dbTables.getTasksTable() + ) + ) + .bind("active", 0) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newArrayList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List accumulator, + byte[] payload, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + accumulator.add(jsonMapper.readValue(payload, TaskDataSegment.class)); + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + + } + } + ); + + log.info("Found %,d tasks for interval %s.", notActiveTasks.size(), interval); + return notActiveTasks; + } + +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 23db45847c35..be51fbb131b4 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -28,6 +28,7 @@ import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; +import io.druid.timeline.TaskDataSegment; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; @@ -51,52 +52,61 @@ public class IndexerSQLMetadataStorageCoordinatorTest private final ObjectMapper mapper = new DefaultObjectMapper(); private final DataSegment defaultSegment = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 ); private final DataSegment defaultSegment2 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(1), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 100 + ); + + private final TaskDataSegment defaultTaskSegment = new TaskDataSegment( + "kafka", + "12345", + "test", + "datasource_test_1", + ImmutableMap.of("baseSequenceName","test_baseSequenceName"), + Interval.parse("2015-01-01T00Z/2015-01-02T00Z") ); private final DataSegment defaultSegment3 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), - "version", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - NoneShardSpec.instance(), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + NoneShardSpec.instance(), + 9, + 100 ); // Overshadows defaultSegment, defaultSegment2 private final DataSegment defaultSegment4 = new DataSegment( - "fooDataSource", - Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), - "zversion", - ImmutableMap.of(), - ImmutableList.of("dim1"), - ImmutableList.of("m1"), - new LinearShardSpec(0), - 9, - 100 + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 ); private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); @@ -112,6 +122,7 @@ public void setUp() derbyConnector.createDataSourceTable(); derbyConnector.createTaskTables(); derbyConnector.createSegmentTable(); +<<<<<<< HEAD metadataUpdateCounter.set(0); coordinator = new IndexerSQLMetadataStorageCoordinator( mapper, @@ -132,27 +143,35 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); } }; +======= + derbyConnector.createPendingSegmentsTable(); + coordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnector + ); +>>>>>>> d135fbd... finished the interface to delete unused pendingSeglments, add TaskDataSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java. } private void unUseSegment() { for (final DataSegment segment : SEGMENTS) { Assert.assertEquals( - 1, (int) derbyConnector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) throws Exception - { - return handle.createStatement( - String.format( - "UPDATE %s SET used = false WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ) - ).bind("id", segment.getIdentifier()).execute(); - } - } - ) + 1, (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "UPDATE %s SET used = false WHERE id = :id", + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() + ) + ).bind("id", segment.getIdentifier()).execute(); + } + } + ) ); } } @@ -161,16 +180,16 @@ private List getUsedIdentifiers() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } - } + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } + } ); } @@ -180,19 +199,19 @@ public void testSimpleAnnounce() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); for (DataSegment segment : SEGMENTS) { Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) ); } Assert.assertEquals( - ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), - getUsedIdentifiers() + ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), + getUsedIdentifiers() ); // Should not update dataSource metadata. @@ -208,13 +227,13 @@ public void testOvershadowingAnnounce() throws IOException for (DataSegment segment : segments) { Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getIdentifier() - ) + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) ); } @@ -226,44 +245,44 @@ public void testTransactionalAnnounceSuccess() throws IOException { // Insert first segment. final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment.getIdentifier() - ) + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) ); // Insert second segment. final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment2.getIdentifier() - ) + mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + defaultSegment2.getIdentifier() + ) ); // Examine metadata. Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "baz")), - coordinator.getDataSourceMetadata("fooDataSource") + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + coordinator.getDataSourceMetadata("fooDataSource") ); // Should only be tried once per call. @@ -351,9 +370,9 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1); @@ -365,16 +384,16 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); @@ -386,16 +405,16 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment2), - new ObjectMetadata(ImmutableMap.of("foo", "qux")), - new ObjectMetadata(ImmutableMap.of("foo", "baz")) + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "qux")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); @@ -408,13 +427,13 @@ public void testSimpleUsedList() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) ) - ) ); } @@ -425,45 +444,45 @@ public void testMultiIntervalUsedList() throws IOException coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval()) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval()) + ) ) - ) ); Assert.assertEquals( - ImmutableSet.of(defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment3.getInterval()) + ImmutableSet.of(defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment3.getInterval()) + ) ) - ) ); Assert.assertEquals( - ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) + ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3), + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval()) + ) ) - ) ); //case to check no duplication if two intervals overlapped with the interval of same segment. Assert.assertEquals( - ImmutableList.of(defaultSegment3), - coordinator.getUsedSegmentsForIntervals( - defaultSegment.getDataSource(), - ImmutableList.of( - Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), - Interval.parse("2015-01-03T09Z/2015-01-04T00Z") + ImmutableList.of(defaultSegment3), + coordinator.getUsedSegmentsForIntervals( + defaultSegment.getDataSource(), + ImmutableList.of( + Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), + Interval.parse("2015-01-03T09Z/2015-01-04T00Z") + ) ) - ) ); } @@ -473,13 +492,13 @@ public void testSimpleUnUsedList() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval() + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) ) - ) ); } @@ -489,14 +508,14 @@ public void testUsedOverlapLow() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Set actualSegments = ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive - ) + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive + ) ); Assert.assertEquals( - SEGMENTS, - actualSegments + SEGMENTS, + actualSegments ); } @@ -506,13 +525,13 @@ public void testUsedOverlapHigh() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + ) ) - ) ); } @@ -521,10 +540,10 @@ public void testUsedOutOfBoundsLow() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) - ).isEmpty() + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) + ).isEmpty() ); } @@ -534,10 +553,10 @@ public void testUsedOutOfBoundsHigh() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) - ).isEmpty() + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) + ).isEmpty() ); } @@ -546,13 +565,13 @@ public void testUsedWithinBoundsEnd() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + ) ) - ) ); } @@ -561,13 +580,13 @@ public void testUsedOverlapEnd() throws IOException { coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + ) ) - ) ); } @@ -578,13 +597,13 @@ public void testUnUsedOverlapLow() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval( - defaultSegment.getInterval().getStart().minus(1), - defaultSegment.getInterval().getStart().plus(1) - ) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval( + defaultSegment.getInterval().getStart().minus(1), + defaultSegment.getInterval().getStart().plus(1) + ) + ).isEmpty() ); } @@ -594,10 +613,10 @@ public void testUnUsedUnderlapLow() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + ).isEmpty() ); } @@ -608,10 +627,10 @@ public void testUnUsedUnderlapHigh() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() ); } @@ -621,10 +640,10 @@ public void testUnUsedOverlapHigh() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) - ).isEmpty() + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() ); } @@ -634,13 +653,13 @@ public void testUnUsedBigOverlap() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - Interval.parse("2000/2999") + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2000/2999") + ) ) - ) ); } @@ -650,22 +669,22 @@ public void testUnUsedLowRange() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + ) ) - ) ); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + ) ) - ) ); } @@ -675,22 +694,22 @@ public void testUnUsedHighRange() throws IOException coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + ) ) - ) ); Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.getUnusedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + ) ) - ) ); } @@ -698,14 +717,14 @@ public void testUnUsedHighRange() throws IOException public void testDeleteDataSourceMetadata() throws IOException { coordinator.announceHistoricalSegments( - ImmutableSet.of(defaultSegment), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); Assert.assertEquals( - new ObjectMetadata(ImmutableMap.of("foo", "bar")), - coordinator.getDataSourceMetadata("fooDataSource") + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.getDataSourceMetadata("fooDataSource") ); Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS")); @@ -713,4 +732,16 @@ public void testDeleteDataSourceMetadata() throws IOException Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource")); } -} + + @Test + public void testDeletePendingSegments() throws IOException + { + Assert.assertTrue("deletePendingSegment",coordinator.deletePendingSegments( + coordinator.getNotActiveTask( + defaultTaskSegment.getInterval() + ) + ) + ); + + } +} \ No newline at end of file From acc247fc45afd96176cd1ce57ee4fdae459736f5 Mon Sep 17 00:00:00 2001 From: linhaoxiang Date: Sat, 7 Jan 2017 16:28:39 +0800 Subject: [PATCH 4/4] =?UTF-8?q?change=20IndexerSQLMetadataStorageCoordinat?= =?UTF-8?q?or=E2=80=99s=20method=20deletePendingSegment=E2=80=99s=20sql=20?= =?UTF-8?q?->=20=E2=80=9CDELETE=20from=20%1s=20WHERE=20sequence=5Fname=20L?= =?UTF-8?q?IKE=20'%2s%%=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit and format other code --- .../main/java/io/druid/timeline/TaskDataSegment.java | 2 +- .../test/TestIndexerMetadataStorageCoordinator.java | 2 +- .../overlord/IndexerMetadataStorageCoordinator.java | 2 +- .../IndexerSQLMetadataStorageCoordinator.java | 11 +++++++---- .../IndexerSQLMetadataStorageCoordinatorTest.java | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/druid/timeline/TaskDataSegment.java b/api/src/main/java/io/druid/timeline/TaskDataSegment.java index 7b2ca5d8c6d9..23bee63cdafb 100644 --- a/api/src/main/java/io/druid/timeline/TaskDataSegment.java +++ b/api/src/main/java/io/druid/timeline/TaskDataSegment.java @@ -91,4 +91,4 @@ public String toString() } -} \ No newline at end of file +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 21135ac7e2d4..a54e3d596b61 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -152,4 +152,4 @@ public void setUnusedSegments(List newUnusedSegments) unusedSegments.addAll(newUnusedSegments); } } -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 5ce2ba5ae17a..4a683b039277 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -149,4 +149,4 @@ SegmentPublishResult announceHistoricalSegments( * @return The TaskDataSegment list which used to match the pendingSegment table's payload */ List getNotActiveTask(final Interval interval); -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 8334ed55005d..1e6a7b7dbf61 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -947,11 +947,14 @@ public boolean deletePendingSegment(final TaskDataSegment taskDataSegment) public Boolean withHandle(Handle handle) throws Exception { int rows = handle.createStatement( - String.format("DELETE from %s WHERE sequence_name = :sequence_name", dbTables.getPendingSegmentsTable()) + String.format( + "DELETE from %1s WHERE sequence_name LIKE '%2s%%'", + dbTables.getPendingSegmentsTable(), + taskDataSegment.getIoConfig().get("baseSequenceName") + ) ) - .bind("sequence_name", taskDataSegment.getIoConfig().get("baseSequenceName")) .execute(); - log.info("HHHHHHHHHHHHHHHHHHHHHHHHHH:"+taskDataSegment.getIoConfig().get("baseSequenceName")); + log.info("Delete sequenceName:"+taskDataSegment.getIoConfig().get("baseSequenceName")); return rows > 0; } @@ -1059,4 +1062,4 @@ public List fold( return notActiveTasks; } -} \ No newline at end of file +} diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index be51fbb131b4..9a52ef235eb1 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -744,4 +744,4 @@ public void testDeletePendingSegments() throws IOException ); } -} \ No newline at end of file +}