Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.overlord.SegmentLock;
import org.joda.time.Interval;

import javax.annotation.Nullable;

/**
* Represents a lock held by some task. Immutable.
*/
public class TaskLock
public class TaskLock implements SegmentLock
{
private final TaskLockType type;
private final String groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private SegmentIdentifier tryAllocate(
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
skipSegmentLineageCheck,
lockResult.getTaskLock()
);
if (identifier != null) {
return identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,29 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class TaskLockBoxConcurrencyTest
{
Expand All @@ -56,6 +64,7 @@ public class TaskLockBoxConcurrencyTest
private ExecutorService service;
private TaskStorage taskStorage;
private TaskLockbox lockbox;
private IndexerSQLMetadataStorageCoordinator coordinator;

@Before
public void setup()
Expand All @@ -74,6 +83,21 @@ public void setup()

lockbox = new TaskLockbox(taskStorage);
service = Executors.newFixedThreadPool(2);

final ObjectMapper mapper = TestHelper.makeJsonMapper();
final AtomicLong metadataUpdateCounter = new AtomicLong();

mapper.registerSubtypes(LinearShardSpec.class, NumberedShardSpec.class);
derbyConnector.createDataSourceTable();
derbyConnector.createTaskTables();
derbyConnector.createSegmentTable();
derbyConnector.createPendingSegmentsTable();
metadataUpdateCounter.set(0);
coordinator = new IndexerSQLMetadataStorageCoordinator(
mapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
}

@After
Expand Down Expand Up @@ -230,4 +254,39 @@ public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception
Assert.assertEquals(1, future1.get().intValue());
Assert.assertEquals(2, future2.get().intValue());
}

@Test(timeout = 60_000L)
public void testAllocatePendingSegment() throws Exception
{
final String dataSource = "myDataSource";
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task task = NoopTask.create(dataSource);
lockbox.add(task);
final LockResult result = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval);
Assert.assertFalse(result.isRevoked());
Assert.assertTrue(result.isOk());

List<Future<SegmentIdentifier>> list = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);

for (int i = 0; i < 100; i++) {
list.add(service.submit(() -> {
latch.await();
String sequnceName = "sequnceName" + ThreadLocalRandom.current().nextLong(10000);
return coordinator.allocatePendingSegment(
dataSource,
sequnceName,
null,
interval,
result.getTaskLock().getVersion(),
true,
result.getTaskLock());
}));
}

latch.countDown();
for (Future<SegmentIdentifier> future : list) {
future.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Sets;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentLock;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
Expand Down Expand Up @@ -70,7 +71,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata da
{
return false;
}

@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval)
{
Expand All @@ -82,7 +83,7 @@ public List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(S
{
return ImmutableList.of();
}

@Override
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
Expand Down Expand Up @@ -129,7 +130,8 @@ public SegmentIdentifier allocatePendingSegment(
String previousSegmentId,
Interval interval,
String maxVersion,
boolean skipSegmentLineageCheck
boolean skipSegmentLineageCheck,
SegmentLock segmentLock
)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ SegmentIdentifier allocatePendingSegment(
String previousSegmentId,
Interval interval,
String maxVersion,
boolean skipSegmentLineageCheck
boolean skipSegmentLineageCheck,
SegmentLock segmentLock
);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord;

/**
* Represents a lock held by some segments.
* It's for pending segment id conflict, when insert it to DB.
*/
public interface SegmentLock
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentLock;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -379,13 +380,15 @@ public SegmentIdentifier allocatePendingSegment(
@Nullable final String previousSegmentId,
final Interval interval,
final String maxVersion,
final boolean skipSegmentLineageCheck
final boolean skipSegmentLineageCheck,
final SegmentLock segmentLock
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
Preconditions.checkNotNull(segmentLock, "segmentLock");

return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
Expand All @@ -394,15 +397,16 @@ public SegmentIdentifier allocatePendingSegment(
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
return skipSegmentLineageCheck ?
allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) :
allocatePendingSegmentWithSegmentLineageCheck(
handle,
dataSource,
sequenceName,
previousSegmentId,
interval,
maxVersion
);
allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion, segmentLock) :
allocatePendingSegmentWithSegmentLineageCheck(
handle,
dataSource,
sequenceName,
previousSegmentId,
interval,
maxVersion,
segmentLock
);
}
},
ALLOCATE_SEGMENT_QUIET_TRIES,
Expand All @@ -417,7 +421,8 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck(
final String sequenceName,
@Nullable final String previousSegmentId,
final Interval interval,
final String maxVersion
final String maxVersion,
final SegmentLock segmentLock
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
Expand All @@ -444,15 +449,6 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck(
return result.segmentIdentifier;
}

final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}

// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.

// UNIQUE key for the row, ensuring sequences do not fork in two directions.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/apache/incubator-druid/issues/2319)
Expand All @@ -466,15 +462,23 @@ private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck(
.asBytes()
);

insertToMetastore(
handle,
newIdentifier,
dataSource,
interval,
previousSegmentIdNotNull,
sequenceName,
sequenceNamePrevIdSha1
);
final SegmentIdentifier newIdentifier;
synchronized (segmentLock) {
newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}

insertToMetastore(
handle,
newIdentifier,
dataSource,
interval,
previousSegmentIdNotNull,
sequenceName,
sequenceNamePrevIdSha1
);
}
return newIdentifier;
}

Expand All @@ -484,7 +488,8 @@ private SegmentIdentifier allocatePendingSegment(
final String dataSource,
final String sequenceName,
final Interval interval,
final String maxVersion
final String maxVersion,
final SegmentLock segmentLock
) throws IOException
{
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
Expand Down Expand Up @@ -513,15 +518,6 @@ private SegmentIdentifier allocatePendingSegment(
return result.segmentIdentifier;
}

final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}

// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.

// UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/apache/incubator-druid/issues/2319)
Expand All @@ -536,9 +532,16 @@ private SegmentIdentifier allocatePendingSegment(
.asBytes()
);

// always insert empty previous sequence id
insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
final SegmentIdentifier newIdentifier;
synchronized (segmentLock) {
newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}

// always insert empty previous sequence id
insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
}
log.info(
"Allocated pending segment [%s] for sequence[%s] in DB",
newIdentifier.getIdentifierAsString(),
Expand Down
Loading