Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
Expand All @@ -43,6 +45,7 @@
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
Expand All @@ -57,6 +60,7 @@
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
Expand All @@ -65,25 +69,28 @@
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
*/
public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{
private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100;

private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig dbTables;
Expand Down Expand Up @@ -375,8 +382,6 @@ public SegmentPublishResult inTransaction(
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);

final Set<DataSegment> inserted = new HashSet<>();

if (startMetadata != null) {
final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
Expand All @@ -398,11 +403,7 @@ public SegmentPublishResult inTransaction(
}
}

for (final DataSegment segment : segments) {
if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) {
inserted.add(segment);
}
}
final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);

return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
Expand Down Expand Up @@ -946,83 +947,93 @@ public int deletePendingSegments(String dataSource)
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions.
*
* @return true if the segment was added, false if it already existed
* @return DataSegment set inserted
*/
private boolean announceHistoricalSegment(
private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
final DataSegment segment,
final boolean used
final Set<DataSegment> segments,
final Set<DataSegment> usedSegments
) throws IOException
{
final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
if (segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getId());
return false;
Set<String> existedSegments = segmentExistsBatch(handle, segments);
log.info("Found these segments already exist in DB: %s", existedSegments);
for (DataSegment segment : segments) {
if (!existedSegments.contains(segment.getId().toString())) {
toInsertSegments.add(segment);
}
}

// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
final int numRowsInserted = handle.createStatement(
final List<List<DataSegment>> partitionedSegments = Lists.partition(
new ArrayList<>(toInsertSegments),
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);

PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, "
+ "payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", used)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();

if (numRowsInserted == 1) {
log.info(
"Published segment [%s] to DB with used flag [%s], json[%s]",
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
} else if (numRowsInserted == 0) {
throw new ISE(
"Failed to publish segment[%s] to DB with used flag[%s], json[%s]",
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
} else {
throw new ISE(
"numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]",
numRowsInserted,
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
);

for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment));
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (succeeded) {
log.infoSegments(partition, "Published segments to DB");
} else {
final List<DataSegment> failedToPublish = IntStream.range(0, partition.size())
.filter(i -> affectedRows[i] != 1)
.mapToObj(partition::get)
.collect(Collectors.toList());
throw new ISE(
"Failed to publish segments to DB: %s",
SegmentUtils.commaSeparatedIdentifiers(failedToPublish)
);
}
}
}
catch (Exception e) {
log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), used);
log.errorSegments(segments, "Exception inserting segments");
throw e;
}

return true;
return toInsertSegments;
}

private boolean segmentExists(final Handle handle, final DataSegment segment)
private Set<String> segmentExistsBatch(final Handle handle, final Set<DataSegment> segments)
{
return !handle
.createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", dbTables.getSegmentsTable()))
.bind("identifier", segment.getId().toString())
.map(StringMapper.FIRST)
.list()
.isEmpty();
Set<String> existedSegments = new HashSet<>();

List<List<DataSegment>> segmentsLists = Lists.partition(new ArrayList<>(segments), MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE);
for (List<DataSegment> segmentList : segmentsLists) {
String segmentIds = segmentList.stream()
.map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'")
.collect(Collectors.joining(","));
List<String> existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds))
.mapTo(String.class)
.list();
existedSegments.addAll(existIds);
}
return existedSegments;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -287,6 +288,50 @@ public void testSimpleAnnounce() throws IOException
Assert.assertEquals(0, metadataUpdateCounter.get());
}

@Test
public void testAnnounceHistoricalSegments() throws IOException
{
Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < 105; i++) {
segments.add(
new DataSegment(
"fooDataSource",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"version",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(i),
9,
100
)
);
}

coordinator.announceHistoricalSegments(segments);
for (DataSegment segment : segments) {
Assert.assertArrayEquals(
mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
segment.getId().toString()
)
);
}

List<String> segmentIds = segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList());
segmentIds.sort(Comparator.naturalOrder());
Assert.assertEquals(
segmentIds,
retrieveUsedSegmentIds()
);

// Should not update dataSource metadata.
Assert.assertEquals(0, metadataUpdateCounter.get());
}

@Test
public void testOvershadowingAnnounce() throws IOException
{
Expand Down