diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index cc4d03351b28..c573beaaca83 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -27,9 +27,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -43,6 +45,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; @@ -57,6 +60,7 @@ import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; @@ -65,12 +69,12 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; -import org.skife.jdbi.v2.util.StringMapper; import javax.annotation.Nullable; import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -78,12 +82,15 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** */ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); + private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100; private final ObjectMapper jsonMapper; private final MetadataStorageTablesConfig dbTables; @@ -375,8 +382,6 @@ public SegmentPublishResult inTransaction( // Set definitelyNotUpdated back to false upon retrying. definitelyNotUpdated.set(false); - final Set inserted = new HashSet<>(); - if (startMetadata != null) { final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, @@ -398,11 +403,7 @@ public SegmentPublishResult inTransaction( } } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); - } - } + final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } @@ -946,83 +947,93 @@ public int deletePendingSegments(String dataSource) * Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although, * this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions. * - * @return true if the segment was added, false if it already existed + * @return DataSegment set inserted */ - private boolean announceHistoricalSegment( + private Set announceHistoricalSegmentBatch( final Handle handle, - final DataSegment segment, - final boolean used + final Set segments, + final Set usedSegments ) throws IOException { + final Set toInsertSegments = new HashSet<>(); try { - if (segmentExists(handle, segment)) { - log.info("Found [%s] in DB, not updating DB", segment.getId()); - return false; + Set existedSegments = segmentExistsBatch(handle, segments); + log.info("Found these segments already exist in DB: %s", existedSegments); + for (DataSegment segment : segments) { + if (!existedSegments.contains(segment.getId().toString())) { + toInsertSegments.add(segment); + } } // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. - final int numRowsInserted = handle.createStatement( + final List> partitionedSegments = Lists.partition( + new ArrayList<>(toInsertSegments), + MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE + ); + + PreparedBatch preparedBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, " - + "payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable(), connector.getQuoteString() ) - ) - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); - - if (numRowsInserted == 1) { - log.info( - "Published segment [%s] to DB with used flag [%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else if (numRowsInserted == 0) { - throw new ISE( - "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else { - throw new ISE( - "numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]", - numRowsInserted, - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); + ); + + for (List partition : partitionedSegments) { + for (DataSegment segment : partition) { + preparedBatch.add() + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)); + } + final int[] affectedRows = preparedBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (succeeded) { + log.infoSegments(partition, "Published segments to DB"); + } else { + final List failedToPublish = IntStream.range(0, partition.size()) + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .collect(Collectors.toList()); + throw new ISE( + "Failed to publish segments to DB: %s", + SegmentUtils.commaSeparatedIdentifiers(failedToPublish) + ); + } } } catch (Exception e) { - log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), used); + log.errorSegments(segments, "Exception inserting segments"); throw e; } - return true; + return toInsertSegments; } - private boolean segmentExists(final Handle handle, final DataSegment segment) + private Set segmentExistsBatch(final Handle handle, final Set segments) { - return !handle - .createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", dbTables.getSegmentsTable())) - .bind("identifier", segment.getId().toString()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + Set existedSegments = new HashSet<>(); + + List> segmentsLists = Lists.partition(new ArrayList<>(segments), MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE); + for (List segmentList : segmentsLists) { + String segmentIds = segmentList.stream() + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); + List existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) + .mapTo(String.class) + .list(); + existedSegments.addAll(existIds); + } + return existedSegments; } /** diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 8e7219614cd7..ebe82a0b140e 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -61,6 +61,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -287,6 +288,50 @@ public void testSimpleAnnounce() throws IOException Assert.assertEquals(0, metadataUpdateCounter.get()); } + @Test + public void testAnnounceHistoricalSegments() throws IOException + { + Set segments = new HashSet<>(); + for (int i = 0; i < 105; i++) { + segments.add( + new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(i), + 9, + 100 + ) + ); + } + + coordinator.announceHistoricalSegments(segments); + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getId().toString() + ) + ); + } + + List segmentIds = segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()); + segmentIds.sort(Comparator.naturalOrder()); + Assert.assertEquals( + segmentIds, + retrieveUsedSegmentIds() + ); + + // Should not update dataSource metadata. + Assert.assertEquals(0, metadataUpdateCounter.get()); + } + @Test public void testOvershadowingAnnounce() throws IOException {