From bbf80a4bb9b2e66b8f29987034dbebf7e2e982c2 Mon Sep 17 00:00:00 2001 From: zhaofaxian Date: Fri, 14 Sep 2018 20:25:26 +0800 Subject: [PATCH 1/3] 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict. 2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB. 3. 'select and insert' is not need within transaction. --- .../common/actions/SegmentAllocateAction.java | 19 +++++++++++-------- .../IndexerSQLMetadataStorageCoordinator.java | 10 ++++------ .../druid/metadata/SQLMetadataConnector.java | 6 +++--- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 26c7bb74f1d3..ea11ecc16ec2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -267,14 +267,17 @@ private SegmentIdentifier tryAllocate( } if (lockResult.isOk()) { - final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( - dataSource, - sequenceName, - previousSegmentId, - tryInterval, - lockResult.getTaskLock().getVersion(), - skipSegmentLineageCheck - ); + SegmentIdentifier identifier; + synchronized (lockResult.getTaskLock()) { + identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + lockResult.getTaskLock().getVersion(), + skipSegmentLineageCheck + ); + } if (identifier != null) { return identifier; } else { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index d021e4b1f26b..ccd23733513e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -387,11 +387,11 @@ public SegmentIdentifier allocatePendingSegment( Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "maxVersion"); - return connector.retryTransaction( - new TransactionCallback() + return connector.retryWithHandle( + new HandleCallback() { @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + public SegmentIdentifier withHandle(Handle handle) throws Exception { return skipSegmentLineageCheck ? allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) : @@ -404,9 +404,7 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact maxVersion ); } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + } ); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 8c8528142e59..7c7312b3cbdb 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -32,6 +32,7 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.TransactionCallback; +import org.skife.jdbi.v2.TransactionIsolationLevel; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; @@ -142,7 +143,7 @@ public T retryWithHandle(final HandleCallback callback) public T retryTransaction(final TransactionCallback callback, final int quietTries, final int maxTries) { try { - return RetryUtils.retry(() -> getDBI().inTransaction(callback), shouldRetry, quietTries, maxTries); + return RetryUtils.retry(() -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), shouldRetry, quietTries, maxTries); } catch (Exception e) { throw Throwables.propagate(e); @@ -259,8 +260,7 @@ public void createSegmentTable(final String tableName) + ")", tableName, getPayloadType(), getQuoteString() ), - StringUtils.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource)", tableName), - StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", tableName) + StringUtils.format("CREATE INDEX idx_%1$s_datasource_used_end ON %1$s(dataSource, used, %2$send%2$s)", tableName, getQuoteString()) ) ); } From aa716ca587928f1c1050aafc8e8ece9cfccb50a6 Mon Sep 17 00:00:00 2001 From: zhaofaxian Date: Mon, 8 Oct 2018 17:04:44 +0800 Subject: [PATCH 2/3] Use TaskLockbox.doInCriticalSection instead of synchronized syntax to speed up insert pending segments. --- .../common/actions/SegmentAllocateAction.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index ea11ecc16ec2..f566499281b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.java.util.common.IAE; @@ -267,17 +269,29 @@ private SegmentIdentifier tryAllocate( } if (lockResult.isOk()) { - SegmentIdentifier identifier; - synchronized (lockResult.getTaskLock()) { - identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( - dataSource, - sequenceName, - previousSegmentId, - tryInterval, - lockResult.getTaskLock().getVersion(), - skipSegmentLineageCheck + final SegmentIdentifier identifier; + try { + identifier = toolbox.getTaskLockbox().doInCriticalSection( + task, + ImmutableList.of(tryInterval), + CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + lockResult.getTaskLock().getVersion(), + skipSegmentLineageCheck + ) + ).onInvalidLocks(null) + .build() ); } + catch (Exception e) { + throw new RuntimeException(e); + } + if (identifier != null) { return identifier; } else { From 8acec1c006eec180b340ecfc90e3e8b893f0d4ff Mon Sep 17 00:00:00 2001 From: zhaofaxian Date: Mon, 22 Oct 2018 11:34:37 +0800 Subject: [PATCH 3/3] fix typo for NullPointerException --- .../druid/indexing/common/actions/SegmentAllocateAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index f566499281b9..98ad25d74191 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -284,7 +284,9 @@ private SegmentIdentifier tryAllocate( lockResult.getTaskLock().getVersion(), skipSegmentLineageCheck ) - ).onInvalidLocks(null) + ).onInvalidLocks( + () -> null + ) .build() ); }