diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index 865b053b95c4..9c3ecc210e00 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import org.apache.druid.indexing.overlord.SegmentLock; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -30,7 +31,7 @@ /** * Represents a lock held by some task. Immutable. */ -public class TaskLock +public class TaskLock implements SegmentLock { private final TaskLockType type; private final String groupId; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 26c7bb74f1d3..872b117df7b7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -273,7 +273,8 @@ private SegmentIdentifier tryAllocate( previousSegmentId, tryInterval, lockResult.getTaskLock().getVersion(), - skipSegmentLineageCheck + skipSegmentLineageCheck, + lockResult.getTaskLock() ); if (identifier != null) { return identifier; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 770a27922210..7ec0252a338a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -31,7 +31,12 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -39,6 +44,7 @@ import org.junit.Rule; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -46,6 +52,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; public class TaskLockBoxConcurrencyTest { @@ -56,6 +64,7 @@ public class TaskLockBoxConcurrencyTest private ExecutorService service; private TaskStorage taskStorage; private TaskLockbox lockbox; + private IndexerSQLMetadataStorageCoordinator coordinator; @Before public void setup() @@ -74,6 +83,21 @@ public void setup() lockbox = new TaskLockbox(taskStorage); service = Executors.newFixedThreadPool(2); + + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + final AtomicLong metadataUpdateCounter = new AtomicLong(); + + mapper.registerSubtypes(LinearShardSpec.class, NumberedShardSpec.class); + derbyConnector.createDataSourceTable(); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + derbyConnector.createPendingSegmentsTable(); + metadataUpdateCounter.set(0); + coordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + derby.metadataTablesConfigSupplier().get(), + derbyConnector + ); } @After @@ -230,4 +254,39 @@ public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception Assert.assertEquals(1, future1.get().intValue()); Assert.assertEquals(2, future2.get().intValue()); } + + @Test(timeout = 60_000L) + public void testAllocatePendingSegment() throws Exception + { + final String dataSource = "myDataSource"; + final Interval interval = Intervals.of("2017-01-01/2017-01-02"); + final Task task = NoopTask.create(dataSource); + lockbox.add(task); + final LockResult result = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval); + Assert.assertFalse(result.isRevoked()); + Assert.assertTrue(result.isOk()); + + List> list = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < 100; i++) { + list.add(service.submit(() -> { + latch.await(); + String sequnceName = "sequnceName" + ThreadLocalRandom.current().nextLong(10000); + return coordinator.allocatePendingSegment( + dataSource, + sequnceName, + null, + interval, + result.getTaskLock().getVersion(), + true, + result.getTaskLock()); + })); + } + + latch.countDown(); + for (Future future : list) { + future.get(); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 381170742ac0..eb2b6ffc509d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.SegmentLock; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.Pair; import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier; @@ -70,7 +71,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata da { return false; } - + @Override public List getUsedSegmentsForInterval(String dataSource, Interval interval) { @@ -82,7 +83,7 @@ public List> getUsedSegmentAndCreatedDateForInterval(S { return ImmutableList.of(); } - + @Override public List getUsedSegmentsForIntervals( String dataSource, List intervals @@ -129,7 +130,8 @@ public SegmentIdentifier allocatePendingSegment( String previousSegmentId, Interval interval, String maxVersion, - boolean skipSegmentLineageCheck + boolean skipSegmentLineageCheck, + SegmentLock segmentLock ) { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index c58f2749c508..9e3246e5b7c8 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -106,7 +106,8 @@ SegmentIdentifier allocatePendingSegment( String previousSegmentId, Interval interval, String maxVersion, - boolean skipSegmentLineageCheck + boolean skipSegmentLineageCheck, + SegmentLock segmentLock ); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentLock.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentLock.java new file mode 100644 index 000000000000..acfc798b9adb --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentLock.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +/** + * Represents a lock held by some segments. + * It's for pending segment id conflict, when insert it to DB. + */ +public interface SegmentLock +{ + +} diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index d021e4b1f26b..727f5d10a1ea 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import com.google.inject.Inject; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.SegmentLock; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -379,13 +380,15 @@ public SegmentIdentifier allocatePendingSegment( @Nullable final String previousSegmentId, final Interval interval, final String maxVersion, - final boolean skipSegmentLineageCheck + final boolean skipSegmentLineageCheck, + final SegmentLock segmentLock ) { Preconditions.checkNotNull(dataSource, "dataSource"); Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "maxVersion"); + Preconditions.checkNotNull(segmentLock, "segmentLock"); return connector.retryTransaction( new TransactionCallback() @@ -394,15 +397,16 @@ public SegmentIdentifier allocatePendingSegment( public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception { return skipSegmentLineageCheck ? - allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) : - allocatePendingSegmentWithSegmentLineageCheck( - handle, - dataSource, - sequenceName, - previousSegmentId, - interval, - maxVersion - ); + allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion, segmentLock) : + allocatePendingSegmentWithSegmentLineageCheck( + handle, + dataSource, + sequenceName, + previousSegmentId, + interval, + maxVersion, + segmentLock + ); } }, ALLOCATE_SEGMENT_QUIET_TRIES, @@ -417,7 +421,8 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck( final String sequenceName, @Nullable final String previousSegmentId, final Interval interval, - final String maxVersion + final String maxVersion, + final SegmentLock segmentLock ) throws IOException { final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; @@ -444,15 +449,6 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck( return result.segmentIdentifier; } - final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); - if (newIdentifier == null) { - return null; - } - - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - // UNIQUE key for the row, ensuring sequences do not fork in two directions. // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines // have difficulty with large unique keys (see https://github.com/apache/incubator-druid/issues/2319) @@ -466,15 +462,23 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck( .asBytes() ); - insertToMetastore( - handle, - newIdentifier, - dataSource, - interval, - previousSegmentIdNotNull, - sequenceName, - sequenceNamePrevIdSha1 - ); + final SegmentIdentifier newIdentifier; + synchronized (segmentLock) { + newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); + if (newIdentifier == null) { + return null; + } + + insertToMetastore( + handle, + newIdentifier, + dataSource, + interval, + previousSegmentIdNotNull, + sequenceName, + sequenceNamePrevIdSha1 + ); + } return newIdentifier; } @@ -484,7 +488,8 @@ private SegmentIdentifier allocatePendingSegment( final String dataSource, final String sequenceName, final Interval interval, - final String maxVersion + final String maxVersion, + final SegmentLock segmentLock ) throws IOException { final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( @@ -513,15 +518,6 @@ private SegmentIdentifier allocatePendingSegment( return result.segmentIdentifier; } - final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); - if (newIdentifier == null) { - return null; - } - - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - // UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval. // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines // have difficulty with large unique keys (see https://github.com/apache/incubator-druid/issues/2319) @@ -536,9 +532,16 @@ private SegmentIdentifier allocatePendingSegment( .asBytes() ); - // always insert empty previous sequence id - insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + final SegmentIdentifier newIdentifier; + synchronized (segmentLock) { + newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); + if (newIdentifier == null) { + return null; + } + // always insert empty previous sequence id + insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + } log.info( "Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier.getIdentifierAsString(), diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f13105ebceab..314cf8123803 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; +import org.apache.druid.indexing.overlord.SegmentLock; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -841,13 +842,15 @@ public void testAllocatePendingSegment() { final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final SegmentLock segmentLock = new SegmentLock() {}; final SegmentIdentifier identifier = coordinator.allocatePendingSegment( dataSource, "seq", null, interval, "version", - false + false, + segmentLock ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString()); @@ -858,7 +861,8 @@ public void testAllocatePendingSegment() identifier.toString(), interval, identifier.getVersion(), - false + false, + segmentLock ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString()); @@ -869,7 +873,8 @@ public void testAllocatePendingSegment() identifier1.toString(), interval, identifier1.getVersion(), - false + false, + segmentLock ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString()); @@ -880,7 +885,8 @@ public void testAllocatePendingSegment() identifier1.toString(), interval, identifier1.getVersion(), - false + false, + segmentLock ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString()); @@ -892,7 +898,8 @@ public void testAllocatePendingSegment() null, interval, "version", - false + false, + segmentLock ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString()); @@ -904,6 +911,7 @@ public void testDeletePendingSegment() throws InterruptedException final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); String prevSegmentId = null; + final SegmentLock segmentLock = new SegmentLock() {}; final DateTime begin = DateTimes.nowUtc(); @@ -914,7 +922,8 @@ public void testDeletePendingSegment() throws InterruptedException prevSegmentId, interval, "version", - false + false, + segmentLock ); prevSegmentId = identifier.toString(); } @@ -928,7 +937,8 @@ public void testDeletePendingSegment() throws InterruptedException prevSegmentId, interval, "version", - false + false, + segmentLock ); prevSegmentId = identifier.toString(); }