From 110b9a346632248d0053298d2683f7fcc27ffeb6 Mon Sep 17 00:00:00 2001 From: xiangqiao Date: Wed, 27 May 2020 12:37:07 +0800 Subject: [PATCH 1/5] optimize announceHistoricalSegment --- .../SegmentTransactionalInsertAction.java | 41 +++--- .../druid/indexing/overlord/TaskLockbox.java | 14 +++ .../IndexerSQLMetadataStorageCoordinator.java | 118 ++++++++++-------- 3 files changed, 102 insertions(+), 71 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index aad16bfb6fee..d7b1e69bf998 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -191,25 +191,34 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( - task, - allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - CriticalAction.builder() - .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( - segments, - startMetadata, - endMetadata - ) + CriticalAction criticalAction = CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + startMetadata, + endMetadata ) - .onInvalidLocks( - () -> SegmentPublishResult.fail( - "Invalid task locks. Maybe they are revoked by a higher priority task." + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + " Please check the overlord log for details." - ) ) - .build() - ); + ) + .build(); + if (startMetadata == null && endMetadata == null) { + retVal = toolbox.getTaskLockbox().doOfflineSegmentTransactionalInsertAction( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + criticalAction + ); + } else { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + criticalAction + ); + } } catch (Exception e) { throw new RuntimeException(e); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 6a56169a3686..8d082b4dba9f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -89,6 +89,7 @@ public class TaskLockbox private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); + private final ReentrantLock offlineSegmentTransactionalInsertLock = new ReentrantLock(true); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -563,6 +564,19 @@ public T doInCriticalSection(Task task, List intervals, CriticalAc } } + public T doOfflineSegmentTransactionalInsertAction(Task task, List intervals, CriticalAction action) throws Exception + { + offlineSegmentTransactionalInsertLock.lock(); + + try { + return action.perform(isTaskLocksValid(task, intervals)); + } + finally { + offlineSegmentTransactionalInsertLock.unlock(); + } + } + + /** * Check all locks task acquired are still valid. * It doesn't check other semantics like acquired locks are enough to overwrite existing segments. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 5c48598880ba..b5e4bafba237 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -30,6 +30,7 @@ import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -56,6 +57,7 @@ import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; @@ -64,7 +66,6 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; -import org.skife.jdbi.v2.util.StringMapper; import javax.annotation.Nullable; import java.io.IOException; @@ -77,12 +78,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** */ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); + private static final int ANNOUNCE_HISTORICAL_SEGMENG_BATCH = 100; private final ObjectMapper jsonMapper; private final MetadataStorageTablesConfig dbTables; @@ -374,8 +377,6 @@ public SegmentPublishResult inTransaction( // Set definitelyNotUpdated back to false upon retrying. definitelyNotUpdated.set(false); - final Set inserted = new HashSet<>(); - if (startMetadata != null) { final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, @@ -397,11 +398,7 @@ public SegmentPublishResult inTransaction( } } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); - } - } + final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } @@ -932,32 +929,42 @@ public int deletePendingSegments(String dataSource) * Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although, * this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions. * - * @return true if the segment was added, false if it already existed + * @return DataSegment set inserted */ - private boolean announceHistoricalSegment( + private Set announceHistoricalSegmentBatch( final Handle handle, - final DataSegment segment, - final boolean used + final Set segments, + final Set usedSegments ) throws IOException { + final Set toInsertSegments = new HashSet<>(); try { - if (segmentExists(handle, segment)) { - log.info("Found [%s] in DB, not updating DB", segment.getId()); - return false; + List existedSegments = segmentExistsBatch(handle, segments); + for (DataSegment segment : segments) { + if (existedSegments.contains(segment.getId().toString())) { + log.info("Found [%s] in DB, not updating DB", segment.getId()); + } else { + toInsertSegments.add(segment); + } } // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. - final int numRowsInserted = handle.createStatement( + final List segmentList = new ArrayList<>(toInsertSegments); + + PreparedBatch preparedBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, " - + "payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable(), connector.getQuoteString() ) - ) + ); + + for (int i = 0; i < segmentList.size(); i++) { + DataSegment segment = segmentList.get(i); + preparedBatch.add() .bind("id", segment.getId().toString()) .bind("dataSource", segment.getDataSource()) .bind("created_date", DateTimes.nowUtc().toString()) @@ -965,50 +972,51 @@ private boolean announceHistoricalSegment( .bind("end", segment.getInterval().getEnd().toString()) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); - - if (numRowsInserted == 1) { - log.info( - "Published segment [%s] to DB with used flag [%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else if (numRowsInserted == 0) { - throw new ISE( - "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else { - throw new ISE( - "numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]", - numRowsInserted, - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)); + + if ((i + 1) % ANNOUNCE_HISTORICAL_SEGMENG_BATCH == 0 || i == segmentList.size() - 1) { + int[] affectedRows = preparedBatch.execute(); + for (int j = 0; j < affectedRows.length; j++) { + DataSegment insertSegment = segmentList.get(i / ANNOUNCE_HISTORICAL_SEGMENG_BATCH * ANNOUNCE_HISTORICAL_SEGMENG_BATCH + j); + if (affectedRows[j] == 1) { + log.info( + "Published segment [%s] to DB with used flag [%s], json[%s]", + insertSegment.getId(), + usedSegments.contains(insertSegment), + jsonMapper.writeValueAsString(insertSegment) + ); + } else { + throw new ISE( + "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", + insertSegment.getId(), + usedSegments.contains(insertSegment), + jsonMapper.writeValueAsString(insertSegment) + ); + } + } + } } } catch (Exception e) { - log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), used); + for (DataSegment segment : segments) { + log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), usedSegments.contains(segment)); + } throw e; } - return true; + return toInsertSegments; } - private boolean segmentExists(final Handle handle, final DataSegment segment) + private List segmentExistsBatch(final Handle handle, final Set segments) { - return !handle - .createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", dbTables.getSegmentsTable())) - .bind("identifier", segment.getId().toString()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + String segmentIds = segments.stream() + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); + return handle + .createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) + .map(String.class) + .list(); } /** From 909081e1ee4ed9243b3c490a3197aa5fafce1e94 Mon Sep 17 00:00:00 2001 From: xiangqiao Date: Wed, 27 May 2020 12:37:07 +0800 Subject: [PATCH 2/5] optimize announceHistoricalSegment --- .../SegmentTransactionalInsertAction.java | 41 +++--- .../druid/indexing/overlord/TaskLockbox.java | 14 +++ .../IndexerSQLMetadataStorageCoordinator.java | 118 ++++++++++-------- 3 files changed, 102 insertions(+), 71 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index aad16bfb6fee..d7b1e69bf998 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -191,25 +191,34 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( - task, - allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - CriticalAction.builder() - .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( - segments, - startMetadata, - endMetadata - ) + CriticalAction criticalAction = CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + startMetadata, + endMetadata ) - .onInvalidLocks( - () -> SegmentPublishResult.fail( - "Invalid task locks. Maybe they are revoked by a higher priority task." + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + " Please check the overlord log for details." - ) ) - .build() - ); + ) + .build(); + if (startMetadata == null && endMetadata == null) { + retVal = toolbox.getTaskLockbox().doOfflineSegmentTransactionalInsertAction( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + criticalAction + ); + } else { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + criticalAction + ); + } } catch (Exception e) { throw new RuntimeException(e); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 6a56169a3686..8d082b4dba9f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -89,6 +89,7 @@ public class TaskLockbox private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); + private final ReentrantLock offlineSegmentTransactionalInsertLock = new ReentrantLock(true); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -563,6 +564,19 @@ public T doInCriticalSection(Task task, List intervals, CriticalAc } } + public T doOfflineSegmentTransactionalInsertAction(Task task, List intervals, CriticalAction action) throws Exception + { + offlineSegmentTransactionalInsertLock.lock(); + + try { + return action.perform(isTaskLocksValid(task, intervals)); + } + finally { + offlineSegmentTransactionalInsertLock.unlock(); + } + } + + /** * Check all locks task acquired are still valid. * It doesn't check other semantics like acquired locks are enough to overwrite existing segments. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 5c48598880ba..b5e4bafba237 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -30,6 +30,7 @@ import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -56,6 +57,7 @@ import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; @@ -64,7 +66,6 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; -import org.skife.jdbi.v2.util.StringMapper; import javax.annotation.Nullable; import java.io.IOException; @@ -77,12 +78,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** */ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); + private static final int ANNOUNCE_HISTORICAL_SEGMENG_BATCH = 100; private final ObjectMapper jsonMapper; private final MetadataStorageTablesConfig dbTables; @@ -374,8 +377,6 @@ public SegmentPublishResult inTransaction( // Set definitelyNotUpdated back to false upon retrying. definitelyNotUpdated.set(false); - final Set inserted = new HashSet<>(); - if (startMetadata != null) { final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, @@ -397,11 +398,7 @@ public SegmentPublishResult inTransaction( } } - for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { - inserted.add(segment); - } - } + final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } @@ -932,32 +929,42 @@ public int deletePendingSegments(String dataSource) * Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although, * this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions. * - * @return true if the segment was added, false if it already existed + * @return DataSegment set inserted */ - private boolean announceHistoricalSegment( + private Set announceHistoricalSegmentBatch( final Handle handle, - final DataSegment segment, - final boolean used + final Set segments, + final Set usedSegments ) throws IOException { + final Set toInsertSegments = new HashSet<>(); try { - if (segmentExists(handle, segment)) { - log.info("Found [%s] in DB, not updating DB", segment.getId()); - return false; + List existedSegments = segmentExistsBatch(handle, segments); + for (DataSegment segment : segments) { + if (existedSegments.contains(segment.getId().toString())) { + log.info("Found [%s] in DB, not updating DB", segment.getId()); + } else { + toInsertSegments.add(segment); + } } // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. - final int numRowsInserted = handle.createStatement( + final List segmentList = new ArrayList<>(toInsertSegments); + + PreparedBatch preparedBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, " - + "payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable(), connector.getQuoteString() ) - ) + ); + + for (int i = 0; i < segmentList.size(); i++) { + DataSegment segment = segmentList.get(i); + preparedBatch.add() .bind("id", segment.getId().toString()) .bind("dataSource", segment.getDataSource()) .bind("created_date", DateTimes.nowUtc().toString()) @@ -965,50 +972,51 @@ private boolean announceHistoricalSegment( .bind("end", segment.getInterval().getEnd().toString()) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) .bind("version", segment.getVersion()) - .bind("used", used) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); - - if (numRowsInserted == 1) { - log.info( - "Published segment [%s] to DB with used flag [%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else if (numRowsInserted == 0) { - throw new ISE( - "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); - } else { - throw new ISE( - "numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]", - numRowsInserted, - segment.getId(), - used, - jsonMapper.writeValueAsString(segment) - ); + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)); + + if ((i + 1) % ANNOUNCE_HISTORICAL_SEGMENG_BATCH == 0 || i == segmentList.size() - 1) { + int[] affectedRows = preparedBatch.execute(); + for (int j = 0; j < affectedRows.length; j++) { + DataSegment insertSegment = segmentList.get(i / ANNOUNCE_HISTORICAL_SEGMENG_BATCH * ANNOUNCE_HISTORICAL_SEGMENG_BATCH + j); + if (affectedRows[j] == 1) { + log.info( + "Published segment [%s] to DB with used flag [%s], json[%s]", + insertSegment.getId(), + usedSegments.contains(insertSegment), + jsonMapper.writeValueAsString(insertSegment) + ); + } else { + throw new ISE( + "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", + insertSegment.getId(), + usedSegments.contains(insertSegment), + jsonMapper.writeValueAsString(insertSegment) + ); + } + } + } } } catch (Exception e) { - log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), used); + for (DataSegment segment : segments) { + log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), usedSegments.contains(segment)); + } throw e; } - return true; + return toInsertSegments; } - private boolean segmentExists(final Handle handle, final DataSegment segment) + private List segmentExistsBatch(final Handle handle, final Set segments) { - return !handle - .createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", dbTables.getSegmentsTable())) - .bind("identifier", segment.getId().toString()) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + String segmentIds = segments.stream() + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); + return handle + .createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) + .map(String.class) + .list(); } /** From e7061f281f275b752e4d770cd667f345f2504476 Mon Sep 17 00:00:00 2001 From: xiangqiao Date: Wed, 27 May 2020 17:14:07 +0800 Subject: [PATCH 3/5] revert offline SegmentTransactionalInsertAction uses a separate lock --- .../SegmentTransactionalInsertAction.java | 41 ++++++++----------- .../druid/indexing/overlord/TaskLockbox.java | 14 ------- 2 files changed, 16 insertions(+), 39 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index d7b1e69bf998..aad16bfb6fee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -191,34 +191,25 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } try { - CriticalAction criticalAction = CriticalAction.builder() - .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( - segments, - startMetadata, - endMetadata + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + startMetadata, + endMetadata + ) ) - ) - .onInvalidLocks( - () -> SegmentPublishResult.fail( - "Invalid task locks. Maybe they are revoked by a higher priority task." + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + " Please check the overlord log for details." + ) ) - ) - .build(); - if (startMetadata == null && endMetadata == null) { - retVal = toolbox.getTaskLockbox().doOfflineSegmentTransactionalInsertAction( - task, - allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - criticalAction - ); - } else { - retVal = toolbox.getTaskLockbox().doInCriticalSection( - task, - allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - criticalAction - ); - } + .build() + ); } catch (Exception e) { throw new RuntimeException(e); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 8d082b4dba9f..6a56169a3686 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -89,7 +89,6 @@ public class TaskLockbox private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); - private final ReentrantLock offlineSegmentTransactionalInsertLock = new ReentrantLock(true); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -564,19 +563,6 @@ public T doInCriticalSection(Task task, List intervals, CriticalAc } } - public T doOfflineSegmentTransactionalInsertAction(Task task, List intervals, CriticalAction action) throws Exception - { - offlineSegmentTransactionalInsertLock.lock(); - - try { - return action.perform(isTaskLocksValid(task, intervals)); - } - finally { - offlineSegmentTransactionalInsertLock.unlock(); - } - } - - /** * Check all locks task acquired are still valid. * It doesn't check other semantics like acquired locks are enough to overwrite existing segments. From 605bc5b32f25ae2a1b88af09a89417ac2450f87f Mon Sep 17 00:00:00 2001 From: xiangqiao Date: Thu, 28 May 2020 13:21:54 +0800 Subject: [PATCH 4/5] optimize segmentExistsBatch: Avoid too many elements in the in condition --- .../IndexerSQLMetadataStorageCoordinator.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b5e4bafba237..1e929bbfbe08 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; @@ -939,7 +940,7 @@ private Set announceHistoricalSegmentBatch( { final Set toInsertSegments = new HashSet<>(); try { - List existedSegments = segmentExistsBatch(handle, segments); + Set existedSegments = segmentExistsBatch(handle, segments); for (DataSegment segment : segments) { if (existedSegments.contains(segment.getId().toString())) { log.info("Found [%s] in DB, not updating DB", segment.getId()); @@ -1008,15 +1009,21 @@ private Set announceHistoricalSegmentBatch( return toInsertSegments; } - private List segmentExistsBatch(final Handle handle, final Set segments) + private Set segmentExistsBatch(final Handle handle, final Set segments) { - String segmentIds = segments.stream() - .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") - .collect(Collectors.joining(",")); - return handle - .createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) - .map(String.class) - .list(); + Set existedSegments = new HashSet<>(); + + List> segmentsLists = Lists.partition(new ArrayList<>(segments), ANNOUNCE_HISTORICAL_SEGMENG_BATCH); + for (List segmentList : segmentsLists) { + String segmentIds = segmentList.stream() + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); + List existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) + .mapTo(String.class) + .list(); + existedSegments.addAll(existIds); + } + return existedSegments; } /** From 835f0d9977764ee9a22799de063de402befc8299 Mon Sep 17 00:00:00 2001 From: xiangqiao Date: Fri, 3 Jul 2020 22:23:53 +0800 Subject: [PATCH 5/5] add unit test && Modified according to cr --- .../IndexerSQLMetadataStorageCoordinator.java | 80 +++++++++---------- ...exerSQLMetadataStorageCoordinatorTest.java | 46 +++++++++++ 2 files changed, 84 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 1e929bbfbe08..5bdf5d783b53 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; @@ -72,6 +73,7 @@ import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -80,13 +82,14 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** */ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); - private static final int ANNOUNCE_HISTORICAL_SEGMENG_BATCH = 100; + private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100; private final ObjectMapper jsonMapper; private final MetadataStorageTablesConfig dbTables; @@ -941,10 +944,9 @@ private Set announceHistoricalSegmentBatch( final Set toInsertSegments = new HashSet<>(); try { Set existedSegments = segmentExistsBatch(handle, segments); + log.info("Found these segments already exist in DB: %s", existedSegments); for (DataSegment segment : segments) { - if (existedSegments.contains(segment.getId().toString())) { - log.info("Found [%s] in DB, not updating DB", segment.getId()); - } else { + if (!existedSegments.contains(segment.getId().toString())) { toInsertSegments.add(segment); } } @@ -952,7 +954,10 @@ private Set announceHistoricalSegmentBatch( // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. - final List segmentList = new ArrayList<>(toInsertSegments); + final List> partitionedSegments = Lists.partition( + new ArrayList<>(toInsertSegments), + MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE + ); PreparedBatch preparedBatch = handle.prepareBatch( StringUtils.format( @@ -963,46 +968,37 @@ private Set announceHistoricalSegmentBatch( ) ); - for (int i = 0; i < segmentList.size(); i++) { - DataSegment segment = segmentList.get(i); - preparedBatch.add() - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", usedSegments.contains(segment)) - .bind("payload", jsonMapper.writeValueAsBytes(segment)); - - if ((i + 1) % ANNOUNCE_HISTORICAL_SEGMENG_BATCH == 0 || i == segmentList.size() - 1) { - int[] affectedRows = preparedBatch.execute(); - for (int j = 0; j < affectedRows.length; j++) { - DataSegment insertSegment = segmentList.get(i / ANNOUNCE_HISTORICAL_SEGMENG_BATCH * ANNOUNCE_HISTORICAL_SEGMENG_BATCH + j); - if (affectedRows[j] == 1) { - log.info( - "Published segment [%s] to DB with used flag [%s], json[%s]", - insertSegment.getId(), - usedSegments.contains(insertSegment), - jsonMapper.writeValueAsString(insertSegment) - ); - } else { - throw new ISE( - "Failed to publish segment[%s] to DB with used flag[%s], json[%s]", - insertSegment.getId(), - usedSegments.contains(insertSegment), - jsonMapper.writeValueAsString(insertSegment) - ); - } - } + for (List partition : partitionedSegments) { + for (DataSegment segment : partition) { + preparedBatch.add() + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)); + } + final int[] affectedRows = preparedBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (succeeded) { + log.infoSegments(partition, "Published segments to DB"); + } else { + final List failedToPublish = IntStream.range(0, partition.size()) + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .collect(Collectors.toList()); + throw new ISE( + "Failed to publish segments to DB: %s", + SegmentUtils.commaSeparatedIdentifiers(failedToPublish) + ); } } } catch (Exception e) { - for (DataSegment segment : segments) { - log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), usedSegments.contains(segment)); - } + log.errorSegments(segments, "Exception inserting segments"); throw e; } @@ -1013,7 +1009,7 @@ private Set segmentExistsBatch(final Handle handle, final Set existedSegments = new HashSet<>(); - List> segmentsLists = Lists.partition(new ArrayList<>(segments), ANNOUNCE_HISTORICAL_SEGMENG_BATCH); + List> segmentsLists = Lists.partition(new ArrayList<>(segments), MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE); for (List segmentList : segmentsLists) { String segmentIds = segmentList.stream() .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5eb34ffbe28d..4a8bf1f63736 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -60,6 +60,8 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -285,6 +287,50 @@ public void testSimpleAnnounce() throws IOException Assert.assertEquals(0, metadataUpdateCounter.get()); } + @Test + public void testAnnounceHistoricalSegments() throws IOException + { + Set segments = new HashSet<>(); + for (int i = 0; i < 105; i++) { + segments.add( + new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(i), + 9, + 100 + ) + ); + } + + coordinator.announceHistoricalSegments(segments); + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getId().toString() + ) + ); + } + + List segmentIds = segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()); + segmentIds.sort(Comparator.naturalOrder()); + Assert.assertEquals( + segmentIds, + retrieveUsedSegmentIds() + ); + + // Should not update dataSource metadata. + Assert.assertEquals(0, metadataUpdateCounter.get()); + } + @Test public void testOvershadowingAnnounce() throws IOException {