Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Months;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
Expand All @@ -38,6 +39,8 @@ public final class DateTimes
public static final DateTime EPOCH = utc(0);
public static final DateTime MAX = utc(JodaUtils.MAX_INSTANT);
public static final DateTime MIN = utc(JodaUtils.MIN_INSTANT);
public static final DateTime CAN_COMPARE_AS_YEAR_MIN = of("0000-01-01");
public static final DateTime CAN_COMPARE_AS_YEAR_MAX = of("10000-01-01").minus(1);

public static final UtcFormatter ISO_DATE_TIME = wrapFormatter(ISODateTimeFormat.dateTime());
public static final UtcFormatter ISO_DATE_OPTIONAL_TIME = wrapFormatter(ISODateTimeFormat.dateOptionalTimeParser());
Expand Down Expand Up @@ -65,8 +68,9 @@ public static DateTimeZone inferTzFromString(String tzId)
/**
* @return The dateTimezone for the provided {@param tzId}. If {@param fallback} is true, the default timezone
* will be returned if the tzId does not match a supported timezone.
*
* @throws IllegalArgumentException if {@param fallback} is false and the provided tzId doesn't match a supported
* timezone.
* timezone.
*/
@SuppressForbidden(reason = "DateTimeZone#forID")
public static DateTimeZone inferTzFromString(String tzId, boolean fallback) throws IllegalArgumentException
Expand Down Expand Up @@ -173,6 +177,22 @@ public static int subMonths(long timestamp1, long timestamp2, DateTimeZone timeZ
return Months.monthsBetween(time1, time2).getMonths();
}

/**
* Returns true if the provided DateTime can be compared against other DateTimes using its string representation.
* Useful when generating SQL queries to the metadata store, or any other situation where time comparisons on
* string representations might be useful.
*
* Conditions: the datetime must be between years 0 and 9999 (inclusive) and must be in the ISO UTC chronology.
*
* See also {@link Intervals#canCompareEndpointsAsStrings(Interval)}.
*/
public static boolean canCompareAsString(final DateTime dateTime)
{
return dateTime.getMillis() >= CAN_COMPARE_AS_YEAR_MIN.getMillis()
&& dateTime.getMillis() <= CAN_COMPARE_AS_YEAR_MAX.getMillis()
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
}

private DateTimes()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.java.util.common;

import com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;

Expand Down Expand Up @@ -48,6 +49,25 @@ public static boolean isEmpty(Interval interval)
return interval.getStart().equals(interval.getEnd());
}

/**
* Returns true if the provided interval has endpoints that can be compared against other DateTimes using their
* string representations.
*
* See also {@link DateTimes#canCompareAsString(DateTime)}.
*/
public static boolean canCompareEndpointsAsStrings(final Interval interval)
{
return DateTimes.canCompareAsString(interval.getStart()) && DateTimes.canCompareAsString(interval.getEnd());
}

/**
* Returns true if the provided interval contains all time.
*/
public static boolean isEternity(final Interval interval)
{
return ETERNITY.equals(interval);
}

private Intervals()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,27 @@ public void testStringToDateTimeConverstion_RethrowInitialException()
String invalid = "51729200AZ";
DateTimes.of(invalid);
}

@Test
public void testCanCompareAsString()
{
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.EPOCH));
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("0000-01-01")));

Assert.assertEquals("0000-01-01T00:00:00.000Z", DateTimes.CAN_COMPARE_AS_YEAR_MIN.toString());
Assert.assertEquals("9999-12-31T23:59:59.999Z", DateTimes.CAN_COMPARE_AS_YEAR_MAX.toString());

Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("9999")));
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("2000")));

Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MIN));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MAX));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("-1-01-01T00:00:00")));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("10000-01-01")));

// Can't compare as string with mixed time zones.
Assert.assertFalse(DateTimes.canCompareAsString(
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId().toString());
sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId());
}
}
// data of the latest interval will be built firstly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws IOException

expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId().toString()));
expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId()));
}

private DataSegment createSegment(Interval interval, String version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public void testKill() throws Exception

Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId()
)
);
Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId().toString()
newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId()
)
);

Expand Down Expand Up @@ -114,7 +114,7 @@ public void testKillWithMarkUnused() throws Exception

Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
Expand All @@ -46,6 +45,7 @@
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -60,8 +60,6 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
Expand Down Expand Up @@ -90,6 +88,7 @@
import java.util.stream.IntStream;

/**
*
*/
public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{
Expand Down Expand Up @@ -193,58 +192,27 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
// 2 range conditions are used on different columns, but not all SQL databases properly optimize it.
// Some databases can only use an index on one of the columns. An additional condition provides
// explicit knowledge that 'start' cannot be greater than 'end'.
return handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start "
+ "and start <= :end and %2$send%2$s <= :end and used = false",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.fold(
new ArrayList<>(),
(Folder3<List<DataSegment>, byte[]>) (accumulator, payload, foldController, statementContext) -> {
accumulator.add(JacksonUtils.readValue(jsonMapper, payload, DataSegment.class));
return accumulator;
}
);
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval))) {
return ImmutableList.copyOf(iterator);
}
}
);

log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
log.info("Found %,d unused segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}

@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
{
int numSegmentsMarkedUnused = connector.retryTransaction(
(handle, status) -> {
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=false WHERE dataSource = :dataSource "
+ "AND start >= :start AND %2$send%2$s <= :end",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.execute();
},
final Integer numSegmentsMarkedUnused = connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.markSegmentsUnused(dataSource, interval),
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
Expand Down Expand Up @@ -292,65 +260,28 @@ private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWi
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
) throws IOException
{
Query<Map<String, Object>> sql = createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals);

try (final ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) {
return VersionedIntervalTimeline.forSegments(
Iterators.transform(dbSegments, payload -> JacksonUtils.readValue(jsonMapper, payload, DataSegment.class))
);
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
return VersionedIntervalTimeline.forSegments(iterator);
}
}

private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
{
return createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals)
.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes("payload"), DataSegment.class))
.list();
}

/**
* Creates a query to the metadata store which selects payload from the segments table for all segments which are
* marked as used and whose interval intersects (not just abuts) with any of the intervals given to this method.
*/
private Query<Map<String, Object>> createUsedSegmentsSqlQueryForIntervals(
Handle handle,
String dataSource,
List<Interval> intervals
)
) throws IOException
{
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ?");
if (!intervals.isEmpty()) {
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
StringUtils.format("(start < ? AND %1$send%1$s > ?)", connector.getQuoteString())
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
final List<DataSegment> retVal = new ArrayList<>();
iterator.forEachRemaining(retVal::add);
return retVal;
}

Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.bind(0, dataSource);

for (int i = 0; i < intervals.size(); i++) {
Interval interval = intervals.get(i);
sql = sql
.bind(2 * i + 1, interval.getEnd().toString())
.bind(2 * i + 2, interval.getStart().toString());
}
return sql;
}

@Override
Expand Down Expand Up @@ -1269,7 +1200,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
*/
protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
final Handle handle,
final Set<DataSegment> segmentsToDrop,
final Collection<DataSegment> segmentsToDrop,
final String dataSource
)
{
Expand All @@ -1288,18 +1219,13 @@ protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
);
return DataStoreMetadataUpdateResult.FAILURE;
}
final List<String> segmentIdList = segmentsToDrop.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList());
Batch batch = handle.createBatch();
segmentIdList.forEach(segmentId -> batch.add(
StringUtils.format(
"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'",
dbTables.getSegmentsTable(),
dataSource,
segmentId
)
));
final int[] segmentChanges = batch.execute();
int numChangedSegments = SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIdList, segmentChanges);

final int numChangedSegments =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper).markSegments(
segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()),
false
);

if (numChangedSegments != segmentsToDrop.size()) {
log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]",
numChangedSegments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)

int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);

int markSegmentsAsUnused(String dataSource, Set<String> segmentIds);
int markSegmentsAsUnused(Set<SegmentId> segmentIds);

/**
* Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
* segment was marked as unused), false otherwise. If the call results in a database error, an exception is relayed to
* the caller.
*/
boolean markSegmentAsUnused(String segmentId);
boolean markSegmentAsUnused(SegmentId segmentId);

/**
* If there are used segments belonging to the given data source this method returns them as an {@link
Expand Down
Loading