diff --git a/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java b/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java index 09ac52279c48..41290949077f 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java +++ b/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java @@ -24,6 +24,7 @@ import org.joda.time.Chronology; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Interval; import org.joda.time.Months; import org.joda.time.chrono.ISOChronology; import org.joda.time.format.DateTimeFormatter; @@ -38,6 +39,8 @@ public final class DateTimes public static final DateTime EPOCH = utc(0); public static final DateTime MAX = utc(JodaUtils.MAX_INSTANT); public static final DateTime MIN = utc(JodaUtils.MIN_INSTANT); + public static final DateTime CAN_COMPARE_AS_YEAR_MIN = of("0000-01-01"); + public static final DateTime CAN_COMPARE_AS_YEAR_MAX = of("10000-01-01").minus(1); public static final UtcFormatter ISO_DATE_TIME = wrapFormatter(ISODateTimeFormat.dateTime()); public static final UtcFormatter ISO_DATE_OPTIONAL_TIME = wrapFormatter(ISODateTimeFormat.dateOptionalTimeParser()); @@ -65,8 +68,9 @@ public static DateTimeZone inferTzFromString(String tzId) /** * @return The dateTimezone for the provided {@param tzId}. If {@param fallback} is true, the default timezone * will be returned if the tzId does not match a supported timezone. + * * @throws IllegalArgumentException if {@param fallback} is false and the provided tzId doesn't match a supported - * timezone. + * timezone. */ @SuppressForbidden(reason = "DateTimeZone#forID") public static DateTimeZone inferTzFromString(String tzId, boolean fallback) throws IllegalArgumentException @@ -173,6 +177,22 @@ public static int subMonths(long timestamp1, long timestamp2, DateTimeZone timeZ return Months.monthsBetween(time1, time2).getMonths(); } + /** + * Returns true if the provided DateTime can be compared against other DateTimes using its string representation. + * Useful when generating SQL queries to the metadata store, or any other situation where time comparisons on + * string representations might be useful. + * + * Conditions: the datetime must be between years 0 and 9999 (inclusive) and must be in the ISO UTC chronology. + * + * See also {@link Intervals#canCompareEndpointsAsStrings(Interval)}. + */ + public static boolean canCompareAsString(final DateTime dateTime) + { + return dateTime.getMillis() >= CAN_COMPARE_AS_YEAR_MIN.getMillis() + && dateTime.getMillis() <= CAN_COMPARE_AS_YEAR_MAX.getMillis() + && ISOChronology.getInstanceUTC().equals(dateTime.getChronology()); + } + private DateTimes() { } diff --git a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java index ed152f555751..b7a1f37cf1c3 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java +++ b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.common; import com.google.common.collect.ImmutableList; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; @@ -48,6 +49,25 @@ public static boolean isEmpty(Interval interval) return interval.getStart().equals(interval.getEnd()); } + /** + * Returns true if the provided interval has endpoints that can be compared against other DateTimes using their + * string representations. + * + * See also {@link DateTimes#canCompareAsString(DateTime)}. + */ + public static boolean canCompareEndpointsAsStrings(final Interval interval) + { + return DateTimes.canCompareAsString(interval.getStart()) && DateTimes.canCompareAsString(interval.getEnd()); + } + + /** + * Returns true if the provided interval contains all time. + */ + public static boolean isEternity(final Interval interval) + { + return ETERNITY.equals(interval); + } + private Intervals() { } diff --git a/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java b/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java index fd3d1ab8c977..bd0341b652ca 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java @@ -100,4 +100,27 @@ public void testStringToDateTimeConverstion_RethrowInitialException() String invalid = "51729200AZ"; DateTimes.of(invalid); } + + @Test + public void testCanCompareAsString() + { + Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.EPOCH)); + Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("0000-01-01"))); + + Assert.assertEquals("0000-01-01T00:00:00.000Z", DateTimes.CAN_COMPARE_AS_YEAR_MIN.toString()); + Assert.assertEquals("9999-12-31T23:59:59.999Z", DateTimes.CAN_COMPARE_AS_YEAR_MAX.toString()); + + Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("9999"))); + Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("2000"))); + + Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MIN)); + Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MAX)); + Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("-1-01-01T00:00:00"))); + Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("10000-01-01"))); + + // Can't compare as string with mixed time zones. + Assert.assertFalse(DateTimes.canCompareAsString( + DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles"))) + ); + } } diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index f456cb611c9b..fd9303c17bc4 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -405,7 +405,7 @@ Pair, Map>> checkSegment // drop derivative segments which interval equals the interval in toDeleteBaseSegments for (Interval interval : toDropInterval.keySet()) { for (DataSegment segment : derivativeSegments.get(interval)) { - sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId().toString()); + sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId()); } } // data of the latest interval will be built firstly. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 86aba77cfd63..6149208fc391 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -74,7 +74,7 @@ public void setup() throws IOException expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval())); - expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId().toString())); + expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId())); } private DataSegment createSegment(Interval interval, String version) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index e796fde24b30..7a00a6afe7bf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -64,12 +64,12 @@ public void testKill() throws Exception Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString() + newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() ) ); Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId().toString() + newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId() ) ); @@ -114,7 +114,7 @@ public void testKillWithMarkUnused() throws Exception Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString() + newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() ) ); diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 4887c907274a..dfae30fe94d9 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; @@ -46,6 +45,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -60,8 +60,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; -import org.skife.jdbi.v2.Batch; -import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; @@ -90,6 +88,7 @@ import java.util.stream.IntStream; /** + * */ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { @@ -193,58 +192,27 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin @Override public List retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval) { - List matchingSegments = connector.inReadOnlyTransaction( + final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { - // 2 range conditions are used on different columns, but not all SQL databases properly optimize it. - // Some databases can only use an index on one of the columns. An additional condition provides - // explicit knowledge that 'start' cannot be greater than 'end'. - return handle - .createQuery( - StringUtils.format( - "SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start " - + "and start <= :end and %2$send%2$s <= :end and used = false", - dbTables.getSegmentsTable(), - connector.getQuoteString() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .fold( - new ArrayList<>(), - (Folder3, byte[]>) (accumulator, payload, foldController, statementContext) -> { - accumulator.add(JacksonUtils.readValue(jsonMapper, payload, DataSegment.class)); - return accumulator; - } - ); + try (final CloseableIterator iterator = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval))) { + return ImmutableList.copyOf(iterator); + } } ); - log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); + log.info("Found %,d unused segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); return matchingSegments; } @Override public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) { - int numSegmentsMarkedUnused = connector.retryTransaction( - (handle, status) -> { - return handle - .createStatement( - StringUtils.format( - "UPDATE %s SET used=false WHERE dataSource = :dataSource " - + "AND start >= :start AND %2$send%2$s <= :end", - dbTables.getSegmentsTable(), - connector.getQuoteString() - ) - ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .execute(); - }, + final Integer numSegmentsMarkedUnused = connector.retryTransaction( + (handle, status) -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .markSegmentsUnused(dataSource, interval), 3, SQLMetadataConnector.DEFAULT_MAX_TRIES ); @@ -292,14 +260,12 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi final Handle handle, final String dataSource, final List intervals - ) + ) throws IOException { - Query> sql = createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals); - - try (final ResultIterator dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) { - return VersionedIntervalTimeline.forSegments( - Iterators.transform(dbSegments, payload -> JacksonUtils.readValue(jsonMapper, payload, DataSegment.class)) - ); + try (final CloseableIterator iterator = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegments(dataSource, intervals)) { + return VersionedIntervalTimeline.forSegments(iterator); } } @@ -307,50 +273,15 @@ private Collection retrieveAllUsedSegmentsForIntervalsWithHandle( final Handle handle, final String dataSource, final List intervals - ) - { - return createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals) - .map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes("payload"), DataSegment.class)) - .list(); - } - - /** - * Creates a query to the metadata store which selects payload from the segments table for all segments which are - * marked as used and whose interval intersects (not just abuts) with any of the intervals given to this method. - */ - private Query> createUsedSegmentsSqlQueryForIntervals( - Handle handle, - String dataSource, - List intervals - ) + ) throws IOException { - final StringBuilder sb = new StringBuilder(); - sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ?"); - if (!intervals.isEmpty()) { - sb.append(" AND ("); - for (int i = 0; i < intervals.size(); i++) { - sb.append( - StringUtils.format("(start < ? AND %1$send%1$s > ?)", connector.getQuoteString()) - ); - if (i == intervals.size() - 1) { - sb.append(")"); - } else { - sb.append(" OR "); - } - } + try (final CloseableIterator iterator = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegments(dataSource, intervals)) { + final List retVal = new ArrayList<>(); + iterator.forEachRemaining(retVal::add); + return retVal; } - - Query> sql = handle - .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) - .bind(0, dataSource); - - for (int i = 0; i < intervals.size(); i++) { - Interval interval = intervals.get(i); - sql = sql - .bind(2 * i + 1, interval.getEnd().toString()) - .bind(2 * i + 2, interval.getStart().toString()); - } - return sql; } @Override @@ -1269,7 +1200,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( */ protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( final Handle handle, - final Set segmentsToDrop, + final Collection segmentsToDrop, final String dataSource ) { @@ -1288,18 +1219,13 @@ protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( ); return DataStoreMetadataUpdateResult.FAILURE; } - final List segmentIdList = segmentsToDrop.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()); - Batch batch = handle.createBatch(); - segmentIdList.forEach(segmentId -> batch.add( - StringUtils.format( - "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'", - dbTables.getSegmentsTable(), - dataSource, - segmentId - ) - )); - final int[] segmentChanges = batch.execute(); - int numChangedSegments = SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIdList, segmentChanges); + + final int numChangedSegments = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper).markSegments( + segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()), + false + ); + if (numChangedSegments != segmentsToDrop.size()) { log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]", numChangedSegments, diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 889141a89c15..492f1b5a0227 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -73,14 +73,14 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) int markAsUnusedSegmentsInInterval(String dataSource, Interval interval); - int markSegmentsAsUnused(String dataSource, Set segmentIds); + int markSegmentsAsUnused(Set segmentIds); /** * Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the * segment was marked as unused), false otherwise. If the call results in a database error, an exception is relayed to * the caller. */ - boolean markSegmentAsUnused(String segmentId); + boolean markSegmentAsUnused(SegmentId segmentId); /** * If there are used segments belonging to the given data source this method returns them as an {@link diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 0ea264dc8912..4ab71bc4647c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -36,14 +36,15 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; @@ -54,10 +55,8 @@ import org.joda.time.Duration; import org.joda.time.Interval; import org.skife.jdbi.v2.BaseResultSetMapper; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; @@ -69,6 +68,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -81,7 +81,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * @@ -558,57 +557,38 @@ public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, */ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) { - List usedSegmentsOverlappingInterval = new ArrayList<>(); - List unusedSegmentsInInterval = new ArrayList<>(); + final List unusedSegments = new ArrayList<>(); + final VersionedIntervalTimeline timeline = + VersionedIntervalTimeline.forSegments(Collections.emptyList()); + connector.inReadOnlyTransaction( (handle, status) -> { - String queryString = - StringUtils.format("SELECT used, payload FROM %1$s WHERE dataSource = :dataSource", getSegmentsTable()); - if (interval != null) { - queryString += StringUtils.format(" AND start < :end AND %1$send%1$s > :start", connector.getQuoteString()); + final SqlSegmentsMetadataQuery queryTool = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper); + + final List intervals = + interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); + + try (final CloseableIterator iterator = + queryTool.retrieveUsedSegments(dataSourceName, intervals)) { + VersionedIntervalTimeline.addSegments(timeline, iterator); } - Query query = handle - .createQuery(queryString) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSourceName); - if (interval != null) { - query = query - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()); + + try (final CloseableIterator iterator = + queryTool.retrieveUnusedSegments(dataSourceName, intervals)) { + while (iterator.hasNext()) { + final DataSegment dataSegment = iterator.next(); + VersionedIntervalTimeline.addSegments(timeline, Iterators.singletonIterator(dataSegment)); + unusedSegments.add(dataSegment); + } } - query = query - .map((int index, ResultSet resultSet, StatementContext context) -> { - DataSegment segment = - JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), DataSegment.class); - if (resultSet.getBoolean("used")) { - usedSegmentsOverlappingInterval.add(segment); - } else { - if (interval == null || interval.contains(segment.getInterval())) { - unusedSegmentsInInterval.add(segment); - } - } - //noinspection ReturnOfNull: intentional, consume() call below doesn't use the results. - return null; - }); - // Consume the query results to ensure usedSegmentsOverlappingInterval and unusedSegmentsInInterval are - // populated. - consume(query.iterator()); + + //noinspection ReturnOfNull: This consumer operates by side effects return null; } ); - VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments( - Iterators.concat(usedSegmentsOverlappingInterval.iterator(), unusedSegmentsInInterval.iterator()) - ); - - return markNonOvershadowedSegmentsAsUsed(unusedSegmentsInInterval, versionedIntervalTimeline); - } - - private static void consume(Iterator iterator) - { - while (iterator.hasNext()) { - iterator.next(); - } + return markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline); } private int markNonOvershadowedSegmentsAsUsed( @@ -616,12 +596,12 @@ private int markNonOvershadowedSegmentsAsUsed( VersionedIntervalTimeline timeline ) { - List segmentIdsToMarkAsUsed = new ArrayList<>(); + List segmentIdsToMarkAsUsed = new ArrayList<>(); for (DataSegment segment : unusedSegments) { if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) { continue; } - segmentIdsToMarkAsUsed.add(segment.getId().toString()); + segmentIdsToMarkAsUsed.add(segment.getId()); } return markSegmentsAsUsed(segmentIdsToMarkAsUsed); @@ -639,12 +619,13 @@ public int markAsUsedNonOvershadowedSegments(final String dataSource, final Set< List unusedSegmentsIntervals = JodaUtils.condenseIntervals( unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) ); - Iterator usedSegmentsOverlappingUnusedSegmentsIntervals = - retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle); - VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( - Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator()) - ); - return new Pair<>(unusedSegments, timeline); + try (CloseableIterator usedSegmentsOverlappingUnusedSegmentsIntervals = + retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle)) { + VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( + Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator()) + ); + return new Pair<>(unusedSegments, timeline); + } } ); @@ -720,68 +701,39 @@ private List retrieveUnusedSegments( return segments; } - private Iterator retrieveUsedSegmentsOverlappingIntervals( + private CloseableIterator retrieveUsedSegmentsOverlappingIntervals( final String dataSource, final Collection intervals, final Handle handle ) { - return intervals - .stream() - .flatMap(interval -> { - Iterable segmentResultIterable = () -> handle - .createQuery( - StringUtils.format( - "SELECT payload FROM %1$s " - + "WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true", - getSegmentsTable(), - connector.getQuoteString() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map((int index, ResultSet resultSet, StatementContext context) -> - JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), DataSegment.class) - ) - .iterator(); - return StreamSupport.stream(segmentResultIterable.spliterator(), false); - }) - .iterator(); + return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .retrieveUsedSegments(dataSource, intervals); } - private int markSegmentsAsUsed(final List segmentIds) + private int markSegmentsAsUsed(final List segmentIds) { if (segmentIds.isEmpty()) { log.info("No segments found to update!"); return 0; } - return connector.getDBI().withHandle(handle -> { - Batch batch = handle.createBatch(); - segmentIds.forEach(segmentId -> batch.add( - StringUtils.format("UPDATE %s SET used=true WHERE id = '%s'", getSegmentsTable(), segmentId) - )); - int[] segmentChanges = batch.execute(); - return computeNumChangedSegments(segmentIds, segmentChanges); - }); + return connector.getDBI().withHandle( + handle -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .markSegments(segmentIds, true) + ); } @Override public int markAsUnusedAllSegmentsInDataSource(final String dataSource) { try { - final int numUpdatedDatabaseEntries = connector.getDBI().withHandle( - (Handle handle) -> handle - .createStatement( - StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable()) - ) - .bind("dataSource", dataSource) - .execute() + return connector.getDBI().withHandle( + handle -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .markSegmentsUnused(dataSource, Intervals.ETERNITY) ); - - return numUpdatedDatabaseEntries; } catch (RuntimeException e) { log.error(e, "Exception marking all segments as unused in data source [%s]", dataSource); @@ -794,10 +746,16 @@ public int markAsUnusedAllSegmentsInDataSource(final String dataSource) * snapshot update. The update of the segment's state will be reflected after the next {@link DatabasePoll}. */ @Override - public boolean markSegmentAsUnused(final String segmentId) + public boolean markSegmentAsUnused(final SegmentId segmentId) { try { - return markSegmentAsUnusedInDatabase(segmentId); + final int numSegments = connector.getDBI().withHandle( + handle -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .markSegments(Collections.singletonList(segmentId), false) + ); + + return numSegments > 0; } catch (RuntimeException e) { log.error(e, "Exception marking segment [%s] as unused", segmentId); @@ -806,109 +764,30 @@ public boolean markSegmentAsUnused(final String segmentId) } @Override - public int markSegmentsAsUnused(String dataSourceName, Set segmentIds) + public int markSegmentsAsUnused(Set segmentIds) { - if (segmentIds.isEmpty()) { - return 0; - } - final List segmentIdList = new ArrayList<>(segmentIds); - try { - return connector.getDBI().withHandle(handle -> { - Batch batch = handle.createBatch(); - segmentIdList.forEach(segmentId -> batch.add( - StringUtils.format( - "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'", - getSegmentsTable(), - dataSourceName, - segmentId - ) - )); - final int[] segmentChanges = batch.execute(); - return computeNumChangedSegments(segmentIdList, segmentChanges); - }); - } - catch (Exception e) { - throw new RuntimeException(e); - } + return connector.getDBI().withHandle( + handle -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .markSegments(segmentIds, false) + ); } @Override public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval) { try { - Integer numUpdatedDatabaseEntries = connector.getDBI().withHandle( - handle -> handle - .createStatement( - StringUtils - .format( - "UPDATE %s SET used=false WHERE datasource = :datasource " - + "AND start >= :start AND %2$send%2$s <= :end", - getSegmentsTable(), - connector.getQuoteString() - )) - .bind("datasource", dataSourceName) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .execute() + return connector.getDBI().withHandle( + handle -> + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) + .markSegmentsUnused(dataSourceName, interval) ); - return numUpdatedDatabaseEntries; } catch (Exception e) { throw new RuntimeException(e); } } - private boolean markSegmentAsUnusedInDatabase(String segmentId) - { - final int numUpdatedRows = connector.getDBI().withHandle( - handle -> handle - .createStatement(StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())) - .bind("segmentID", segmentId) - .execute() - ); - if (numUpdatedRows < 0) { - log.assertionError( - "Negative number of rows updated for segment id [%s]: %d", - segmentId, - numUpdatedRows - ); - } else if (numUpdatedRows > 1) { - log.error( - "More than one row updated for segment id [%s]: %d, " - + "there may be more than one row for the segment id in the database", - segmentId, - numUpdatedRows - ); - } - return numUpdatedRows > 0; - } - - static int computeNumChangedSegments(List segmentIds, int[] segmentChanges) - { - int numChangedSegments = 0; - for (int i = 0; i < segmentChanges.length; i++) { - int numUpdatedRows = segmentChanges[i]; - if (numUpdatedRows < 0) { - log.assertionError( - "Negative number of rows updated for segment id [%s]: %d", - segmentIds.get(i), - numUpdatedRows - ); - } else if (numUpdatedRows > 1) { - log.error( - "More than one row updated for segment id [%s]: %d, " - + "there may be more than one row for the segment id in the database", - segmentIds.get(i), - numUpdatedRows - ); - } - if (numUpdatedRows > 0) { - numChangedSegments += 1; - } - } - return numChangedSegments; - } - @Override public @Nullable ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSourceName) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java new file mode 100644 index 000000000000..4ecf7bebe6d0 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An object that helps {@link SqlSegmentsMetadataManager} and {@link IndexerSQLMetadataStorageCoordinator} make + * queries to the metadata store segments table. Each instance of this class is scoped to a single handle and is meant + * to be short-lived. + */ +public class SqlSegmentsMetadataQuery +{ + private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class); + + private final Handle handle; + private final SQLMetadataConnector connector; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + + private SqlSegmentsMetadataQuery( + final Handle handle, + final SQLMetadataConnector connector, + final MetadataStorageTablesConfig dbTables, + final ObjectMapper jsonMapper + ) + { + this.handle = handle; + this.connector = connector; + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + } + + /** + * Create a query object. This instance is scoped to a single handle and is meant to be short-lived. It is okay + * to use it for more than one query, though. + */ + public static SqlSegmentsMetadataQuery forHandle( + final Handle handle, + final SQLMetadataConnector connector, + final MetadataStorageTablesConfig dbTables, + final ObjectMapper jsonMapper + ) + { + return new SqlSegmentsMetadataQuery(handle, connector, dbTables, jsonMapper); + } + + /** + * Retrieves segments for a given datasource that are marked used (i.e. published) in the metadata store, and that + * *overlap* any interval in a particular collection of intervals. If the collection of intervals is empty, this + * method will retrieve all used segments. + * + * You cannot assume that segments returned by this call are actually active. Because there is some delay between + * new segment publishing and the marking-unused of older segments, it is possible that some segments returned + * by this call are overshadowed by other segments. To check for this, use + * {@link org.apache.druid.timeline.VersionedIntervalTimeline#forSegments(Iterator)}. + * + * This call does not return any information about realtime segments. + * + * Returns a closeable iterator. You should close it when you are done. + */ + public CloseableIterator retrieveUsedSegments( + final String dataSource, + final Collection intervals + ) + { + return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true); + } + + /** + * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval + * in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all + * unused segments. + * + * This call does not return any information about realtime segments. + * + * Returns a closeable iterator. You should close it when you are done. + */ + public CloseableIterator retrieveUnusedSegments( + final String dataSource, + final Collection intervals + ) + { + return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false); + } + + /** + * Marks the provided segments as either used or unused. + * + * Returns the number of segments actually modified. + */ + public int markSegments(final Collection segmentIds, final boolean used) + { + final String dataSource; + + if (segmentIds.isEmpty()) { + return 0; + } else { + dataSource = segmentIds.iterator().next().getDataSource(); + if (segmentIds.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) { + throw new IAE("Segments to drop must all be part of the same datasource"); + } + } + + final PreparedBatch batch = + handle.prepareBatch( + StringUtils.format( + "UPDATE %s SET used = ? WHERE datasource = ? AND id = ?", + dbTables.getSegmentsTable() + ) + ); + + for (SegmentId segmentId : segmentIds) { + batch.add(used, dataSource, segmentId.toString()); + } + + final int[] segmentChanges = batch.execute(); + return computeNumChangedSegments( + segmentIds.stream().map(SegmentId::toString).collect(Collectors.toList()), + segmentChanges + ); + } + + /** + * Marks all segments for a datasource unused that are *fully contained by* a particular interval. + * + * Returns the number of segments actually modified. + */ + public int markSegmentsUnused(final String dataSource, final Interval interval) + { + if (Intervals.isEternity(interval)) { + return handle + .createStatement( + StringUtils.format( + "UPDATE %s SET used=:used WHERE dataSource = :dataSource", + dbTables.getSegmentsTable() + ) + ) + .bind("dataSource", dataSource) + .bind("used", false) + .execute(); + } else if (Intervals.canCompareEndpointsAsStrings(interval) + && interval.getStart().getYear() == interval.getEnd().getYear()) { + // Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because + // that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a + // segment interval like "20001/20002".) + return handle + .createStatement( + StringUtils.format( + "UPDATE %s SET used=:used WHERE dataSource = :dataSource AND %s", + dbTables.getSegmentsTable(), + IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end") + ) + ) + .bind("dataSource", dataSource) + .bind("used", false) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .execute(); + } else { + // Retrieve, then drop, since we can't write a WHERE clause directly. + final List segments = ImmutableList.copyOf( + Iterators.transform( + retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true), + DataSegment::getId + ) + ); + return markSegments(segments, false); + } + } + + private CloseableIterator retrieveSegments( + final String dataSource, + final Collection intervals, + final IntervalMode matchMode, + final boolean used + ) + { + // Check if the intervals all support comparing as strings. If so, bake them into the SQL. + final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); + + final StringBuilder sb = new StringBuilder(); + sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource"); + + if (compareAsString && !intervals.isEmpty()) { + sb.append(" AND ("); + for (int i = 0; i < intervals.size(); i++) { + sb.append( + matchMode.makeSqlCondition( + connector.getQuoteString(), + StringUtils.format(":start%d", i), + StringUtils.format(":end%d", i) + ) + ); + + if (i == intervals.size() - 1) { + sb.append(")"); + } else { + sb.append(" OR "); + } + } + } + + final Query> sql = handle + .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", used) + .bind("dataSource", dataSource); + + if (compareAsString) { + final Iterator iterator = intervals.iterator(); + for (int i = 0; iterator.hasNext(); i++) { + Interval interval = iterator.next(); + sql.bind(StringUtils.format("start%d", i), interval.getStart().toString()) + .bind(StringUtils.format("end%d", i), interval.getEnd().toString()); + } + } + + final ResultIterator resultIterator = + sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)) + .iterator(); + + return CloseableIterators.wrap( + Iterators.filter( + resultIterator, + dataSegment -> { + if (intervals.isEmpty()) { + return true; + } else { + // Must re-check that the interval matches, even if comparing as string, because the *segment interval* + // might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a + // segment interval like "20010/20011".) + for (Interval interval : intervals) { + if (matchMode.apply(interval, dataSegment.getInterval())) { + return true; + } + } + + return false; + } + } + ), + resultIterator + ); + } + + private static int computeNumChangedSegments(List segmentIds, int[] segmentChanges) + { + int numChangedSegments = 0; + for (int i = 0; i < segmentChanges.length; i++) { + int numUpdatedRows = segmentChanges[i]; + if (numUpdatedRows < 0) { + log.assertionError( + "Negative number of rows updated for segment id [%s]: %d", + segmentIds.get(i), + numUpdatedRows + ); + } else if (numUpdatedRows > 1) { + log.error( + "More than one row updated for segment id [%s]: %d, " + + "there may be more than one row for the segment id in the database", + segmentIds.get(i), + numUpdatedRows + ); + } + if (numUpdatedRows > 0) { + numChangedSegments += 1; + } + } + return numChangedSegments; + } + + enum IntervalMode + { + CONTAINS { + @Override + public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder) + { + // 2 range conditions are used on different columns, but not all SQL databases properly optimize it. + // Some databases can only use an index on one of the columns. An additional condition provides + // explicit knowledge that 'start' cannot be greater than 'end'. + return StringUtils.format( + "(start >= %2$s and start <= %3$s and %1$send%1$s <= %3$s)", + quoteString, + startPlaceholder, + endPlaceholder + ); + } + + @Override + public boolean apply(Interval a, Interval b) + { + return a.contains(b); + } + }, + OVERLAPS { + @Override + public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder) + { + return StringUtils.format( + "(start < %3$s AND %1$send%1$s > %2$s)", + quoteString, + startPlaceholder, + endPlaceholder + ); + } + + @Override + public boolean apply(Interval a, Interval b) + { + return a.overlaps(b); + } + }; + + public abstract String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder); + + public abstract boolean apply(Interval a, Interval b); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index f3f500eeb2fb..c901cb967633 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -420,7 +420,7 @@ public CoordinatorCompactionConfig getCompactionConfig() public void markSegmentAsUnused(DataSegment segment) { log.debug("Marking segment[%s] as unused", segment.getId()); - segmentsMetadataManager.markSegmentAsUnused(segment.getId().toString()); + segmentsMetadataManager.markSegmentAsUnused(segment.getId()); } public String getCurrentLeader() diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 0b128a6cc9e4..33ff981c4bcd 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -224,8 +224,19 @@ public Response markSegmentsAsUnused( if (interval != null) { return segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval); } else { - final Set segmentIds = payload.getSegmentIds(); - return segmentsMetadataManager.markSegmentsAsUnused(dataSourceName, segmentIds); + final Set segmentIds = + payload.getSegmentIds() + .stream() + .map(idStr -> SegmentId.tryParse(dataSourceName, idStr)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Note: segments for the "wrong" datasource are ignored. + return segmentsMetadataManager.markSegmentsAsUnused( + segmentIds.stream() + .filter(segmentId -> segmentId.getDataSource().equals(dataSourceName)) + .collect(Collectors.toSet()) + ); } }; return doMarkSegmentsWithPayload("markSegmentsAsUnused", dataSourceName, payload, markSegments); @@ -633,10 +644,11 @@ public Response getServedSegment( @ResourceFilters(DatasourceResourceFilter.class) public Response markSegmentAsUnused( @PathParam("dataSourceName") String dataSourceName, - @PathParam("segmentId") String segmentId + @PathParam("segmentId") String segmentIdString ) { - boolean segmentStateChanged = segmentsMetadataManager.markSegmentAsUnused(segmentId); + final SegmentId segmentId = SegmentId.tryParse(dataSourceName, segmentIdString); + final boolean segmentStateChanged = segmentId != null && segmentsMetadataManager.markSegmentAsUnused(segmentId); return Response.ok(ImmutableMap.of("segmentStateChanged", segmentStateChanged)).build(); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 135c8936b6b9..10f52fbf9f66 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -216,6 +216,42 @@ public class IndexerSQLMetadataStorageCoordinatorTest 100 ); + private final DataSegment hugeTimeRangeSegment1 = new DataSegment( + "hugeTimeRangeDataSource", + Intervals.of("-9994-01-02T00Z/1994-01-03T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(0, 1), + 9, + 100 + ); + + private final DataSegment hugeTimeRangeSegment2 = new DataSegment( + "hugeTimeRangeDataSource", + Intervals.of("2994-01-02T00Z/2994-01-03T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(0, 1), + 9, + 100 + ); + + private final DataSegment hugeTimeRangeSegment3 = new DataSegment( + "hugeTimeRangeDataSource", + Intervals.of("29940-01-02T00Z/29940-01-03T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(0, 1), + 9, + 100 + ); + private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); private final AtomicLong metadataUpdateCounter = new AtomicLong(); private final AtomicLong segmentTableDropUpdateCounter = new AtomicLong(); @@ -256,7 +292,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( @Override protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( final Handle handle, - final Set segmentsToDrop, + final Collection segmentsToDrop, final String dataSource ) { @@ -684,7 +720,8 @@ public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOExc null, null ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail( + "org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get()); @@ -1032,6 +1069,75 @@ public void testUnusedHighRange() throws IOException ); } + @Test + public void testUsedHugeTimeRangeEternityFilter() throws IOException + { + coordinator.announceHistoricalSegments( + ImmutableSet.of( + hugeTimeRangeSegment1, + hugeTimeRangeSegment2, + hugeTimeRangeSegment3 + ) + ); + + Assert.assertEquals( + ImmutableSet.of(hugeTimeRangeSegment1, hugeTimeRangeSegment2, hugeTimeRangeSegment3), + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForIntervals( + hugeTimeRangeSegment1.getDataSource(), + Intervals.ONLY_ETERNITY, + Segments.ONLY_VISIBLE + ) + ) + ); + } + + @Test + public void testUsedHugeTimeRangeTrickyFilter1() throws IOException + { + coordinator.announceHistoricalSegments( + ImmutableSet.of( + hugeTimeRangeSegment1, + hugeTimeRangeSegment2, + hugeTimeRangeSegment3 + ) + ); + + Assert.assertEquals( + ImmutableSet.of(hugeTimeRangeSegment2), + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + hugeTimeRangeSegment1.getDataSource(), + Intervals.of("2900/10000"), + Segments.ONLY_VISIBLE + ) + ) + ); + } + + @Test + public void testUsedHugeTimeRangeTrickyFilter2() throws IOException + { + coordinator.announceHistoricalSegments( + ImmutableSet.of( + hugeTimeRangeSegment1, + hugeTimeRangeSegment2, + hugeTimeRangeSegment3 + ) + ); + + Assert.assertEquals( + ImmutableSet.of(hugeTimeRangeSegment2), + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + hugeTimeRangeSegment1.getDataSource(), + Intervals.of("2993/2995"), + Segments.ONLY_VISIBLE + ) + ) + ); + } + @Test public void testDeleteDataSourceMetadata() throws IOException { @@ -1061,28 +1167,28 @@ public void testDeleteSegmentsInMetaDataStorage() throws IOException // check segments Published Assert.assertEquals( - SEGMENTS, - ImmutableSet.copyOf( - coordinator.retrieveUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval(), - Segments.ONLY_VISIBLE - ) + SEGMENTS, + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE ) + ) ); // remove segments in MetaDataStorage coordinator.deleteSegments(SEGMENTS); // check segments removed Assert.assertEquals( - 0, - ImmutableSet.copyOf( - coordinator.retrieveUsedSegmentsForInterval( - defaultSegment.getDataSource(), - defaultSegment.getInterval(), - Segments.ONLY_VISIBLE - ) - ).size() + 0, + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE + ) + ).size() ); } @@ -1241,7 +1347,10 @@ public void testDeletePendingSegment() throws InterruptedException prevSegmentId = identifier.toString(); } - final int numDeleted = coordinator.deletePendingSegmentsCreatedInInterval(dataSource, new Interval(begin, secondBegin)); + final int numDeleted = coordinator.deletePendingSegmentsCreatedInInterval( + dataSource, + new Interval(begin, secondBegin) + ); Assert.assertEquals(10, numDeleted); } @@ -1454,7 +1563,11 @@ public void testDropSegmentsWithHandleForSegmentThatExist() Assert.assertEquals(defaultSegment.getId().toString(), usedSegments.get(0)); // Try drop segment - IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource()); + IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle( + handle, + ImmutableSet.of(defaultSegment), + defaultSegment.getDataSource() + ); Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.SUCCESS, result); usedSegments = retrieveUsedSegmentIds(); @@ -1467,7 +1580,11 @@ public void testDropSegmentsWithHandleForSegmentThatDoesNotExist() { try (Handle handle = derbyConnector.getDBI().open()) { // Try drop segment - IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource()); + IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle( + handle, + ImmutableSet.of(defaultSegment), + defaultSegment.getDataSource() + ); Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result); } } @@ -1488,7 +1605,10 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDele ); // Try delete. Datasource should not be deleted as it is in excluded set - int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of("fooDataSource")); + int deletedCount = coordinator.removeDataSourceMetadataOlderThan( + System.currentTimeMillis(), + ImmutableSet.of("fooDataSource") + ); // Datasource should not be deleted Assert.assertEquals( @@ -1524,7 +1644,8 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThan } @Test - public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception + public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() + throws Exception { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), @@ -1540,7 +1661,10 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderT // Do delete. Datasource metadata should not be deleted. Datasource is not active but it was created just now so it's // created timestamp will be later than the timestamp 2012-01-01T00:00:00Z - int deletedCount = coordinator.removeDataSourceMetadataOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis(), ImmutableSet.of()); + int deletedCount = coordinator.removeDataSourceMetadataOlderThan( + DateTimes.of("2012-01-01T00:00:00Z").getMillis(), + ImmutableSet.of() + ); // Datasource should not be deleted Assert.assertEquals( @@ -1551,7 +1675,39 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderT } @Test - public void testMarkSegmentsAsUnusedWithinInterval() throws IOException + public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException + { + coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + + // interval covers existingSegment1 and partially overlaps existingSegment2, + // only existingSegment1 will be dropped + coordinator.markSegmentsAsUnusedWithinInterval( + existingSegment1.getDataSource(), + Intervals.of("1994-01-01/1994-01-02T12Z") + ); + + Assert.assertEquals( + ImmutableSet.of(existingSegment1), + ImmutableSet.copyOf( + coordinator.retrieveUnusedSegmentsForInterval( + existingSegment1.getDataSource(), + existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)) + ) + ) + ); + Assert.assertEquals( + ImmutableSet.of(), + ImmutableSet.copyOf( + coordinator.retrieveUnusedSegmentsForInterval( + existingSegment2.getDataSource(), + existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)) + ) + ) + ); + } + + @Test + public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException { coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2)); diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 57df44073460..2c78a115447e 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; import org.joda.time.Period; @@ -443,7 +444,7 @@ public void testMarkSegmentAsUnused() throws IOException, InterruptedException awaitDataSourceAppeared(newDataSource); Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)); - Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId().toString())); + Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId())); awaitDataSourceDisappeared(newDataSource); Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)); } @@ -706,10 +707,10 @@ public void testMarkSegmentsAsUnused() throws IOException publisher.publishSegment(newSegment1); publisher.publishSegment(newSegment2); - final ImmutableSet segmentIds = - ImmutableSet.of(newSegment1.getId().toString(), newSegment1.getId().toString()); + final ImmutableSet segmentIds = + ImmutableSet.of(newSegment1.getId(), newSegment1.getId()); - Assert.assertEquals(segmentIds.size(), sqlSegmentsMetadataManager.markSegmentsAsUnused(newDataSource, segmentIds)); + Assert.assertEquals(segmentIds.size(), sqlSegmentsMetadataManager.markSegmentsAsUnused(segmentIds)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( ImmutableSet.of(segment1, segment2), @@ -717,31 +718,6 @@ public void testMarkSegmentsAsUnused() throws IOException ); } - @Test - public void testMarkSegmentsAsUnusedInvalidDataSource() throws IOException - { - sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); - Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); - - final DataSegment newSegment2 = createNewSegment1(newDataSource); - - publisher.publishSegment(newSegment1); - publisher.publishSegment(newSegment2); - final ImmutableSet segmentIds = - ImmutableSet.of(newSegment1.getId().toString(), newSegment2.getId().toString()); - // none of the segments are in data source - Assert.assertEquals(0, sqlSegmentsMetadataManager.markSegmentsAsUnused("wrongDataSource", segmentIds)); - sqlSegmentsMetadataManager.poll(); - Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment1, newSegment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) - ); - } - @Test public void testMarkAsUnusedSegmentsInInterval() throws IOException { diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 3ca1e0fe6da1..e98aaa0eaddf 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -100,7 +100,7 @@ public void setUp() new DataSegment( "datasource1", Intervals.of("2010-01-01/P1D"), - "", + "v0", null, null, null, @@ -113,7 +113,7 @@ public void setUp() new DataSegment( "datasource1", Intervals.of("2010-01-22/P1D"), - "", + "v0", null, null, null, @@ -126,7 +126,7 @@ public void setUp() new DataSegment( "datasource2", Intervals.of("2010-01-01/P1D"), - "", + "v0", null, null, null, @@ -1018,16 +1018,24 @@ public void testSegmentLoadChecksForInterval() public void testMarkSegmentsAsUnused() { final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>()); - final Set segmentIds = - dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet()); + final Set segmentIds = + dataSegmentList.stream() + .filter(segment -> segment.getDataSource().equals(dataSource1.getName())) + .map(DataSegment::getId) + .collect(Collectors.toSet()); EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once(); EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once(); - EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds)).andReturn(1).once(); + EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); + new DataSourcesResource.MarkDataSourceSegmentsPayload( + null, + segmentIds.stream() + .map(SegmentId::toString) + .collect(Collectors.toSet()) + ); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); @@ -1041,16 +1049,24 @@ public void testMarkSegmentsAsUnused() public void testMarkSegmentsAsUnusedNoChanges() { final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>()); - final Set segmentIds = - dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet()); + final Set segmentIds = + dataSegmentList.stream() + .filter(segment -> segment.getDataSource().equals(dataSource1.getName())) + .map(DataSegment::getId) + .collect(Collectors.toSet()); EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once(); EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once(); - EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds)).andReturn(0).once(); + EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); + new DataSourcesResource.MarkDataSourceSegmentsPayload( + null, + segmentIds.stream() + .map(SegmentId::toString) + .collect(Collectors.toSet()) + ); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); @@ -1064,18 +1080,26 @@ public void testMarkSegmentsAsUnusedNoChanges() public void testMarkSegmentsAsUnusedException() { final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>()); - final Set segmentIds = - dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet()); + final Set segmentIds = + dataSegmentList.stream() + .filter(segment -> segment.getDataSource().equals(dataSource1.getName())) + .map(DataSegment::getId) + .collect(Collectors.toSet()); EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once(); EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once(); - EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds)) + EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)) .andThrow(new RuntimeException("Exception occurred")) .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); + new DataSourcesResource.MarkDataSourceSegmentsPayload( + null, + segmentIds.stream() + .map(SegmentId::toString) + .collect(Collectors.toSet()) + ); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);