Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand All @@ -52,30 +53,36 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
@Nullable
private final DataSourceMetadata endMetadata;

@Nullable
private final Map<String, Set<SegmentIdWithShardSpec>> segmentToUpgradedVersions;

public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments, null, null);
return new SegmentTransactionalAppendAction(segments, null, null, null);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
Map<String, Set<SegmentIdWithShardSpec>> segmentToUpgradedVersions
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, segmentToUpgradedVersions);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("segmentToUpgradedVersions") @Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentToUpgradedVersions
)
{
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
this.segmentToUpgradedVersions = segmentToUpgradedVersions;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
Expand Down Expand Up @@ -103,6 +110,13 @@ public DataSourceMetadata getEndMetadata()
return endMetadata;
}

@JsonProperty
@Nullable
public Map<String, Set<SegmentIdWithShardSpec>> getSegmentToUpgradedVersions()
{
return segmentToUpgradedVersions;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand Down Expand Up @@ -142,7 +156,8 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
segmentToUpgradedVersions
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,21 @@ public TaskStatus runTask(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
segments,
null,
null
);
return toolbox.getTaskActionClient().submit(action);
};
final TransactionalSegmentPublisher publisher =
(mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata, segmentIdToUpgradedVersions) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
segments,
null,
null
);
return toolbox.getTaskActionClient().submit(action);
};

// Skip connecting firehose if we've been stopped before we got started.
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ private TaskStatus generateAndPublishSegments(

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions)
-> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,13 +1170,14 @@ private void publishSegments(

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions)
-> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, null).isSuccess();

if (published) {
LOG.info("Published [%d] segments", newSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,29 @@ private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunk
}
}

/**
* Conflicting lock - A lock for the same datasource and overlapping interval as the request.
* Reusable lock - A conflicting lock which satisifies the lock request.
* Superseding lock - A lock created for the request which may make at least one of the existing locks unneeded
* Flow:
* Check for conflicting locks.
* If there are none, create a new lock posse
* Else, find a unique lock posse with a lock that can be reused for the request
* Else, check if the conflicting locks can coexist and create a new lock posse
* Else, check if the incompatible locks can be revoked and create a new lock posse, and revoke such locks
*
* If none of the conditions is true, return null
* Else, add the task to the newly created lock posse.
* If the lock task was added to the posse for the first time:
* A) Check if the newly created posse can replace any of the existing locks. If yes, unlock such superseded locks.
*. B) Commit the lock and task to druid_tasklocks. (Unlock if the commit to metadata store fails)
*
* @param request Request for lock.
* @param task Task requesting the lock.
* @param persist If true, commit the newly created lock to the metadata store.
* Should be false when syncing from storage.
* @return A TaskLockPosse associating the lock for the request with the provided task.
*/
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated docs improvement

private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -351,7 +352,8 @@ public SequenceMetadataTransactionalSegmentPublisher(
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> mustBeNullOrEmptyOverwriteSegments,
Set<DataSegment> segmentsToPush,
@Nullable Object commitMetadata
@Nullable Object commitMetadata,
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradeVersions
) throws IOException
{
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
Expand Down Expand Up @@ -417,7 +419,7 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata)
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, segmentIdToUpgradeVersions)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata);
} else {
action = taskLockType == TaskLockType.APPEND
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull

ISE exception = Assert.assertThrows(
ISE.class,
() -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null)
() -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null, null)
);
Assert.assertEquals(
"Stream ingestion task unexpectedly attempted to overwrite segments: "
Expand Down Expand Up @@ -115,6 +115,6 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment
);
TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false);

transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of());
transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
Map<String, Set<SegmentIdWithShardSpec>> segmentToUpgradeVersions
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
Expand Down Expand Up @@ -536,6 +537,7 @@ public SegmentPublishResult commitAppendSegments(
appendSegments,
appendSegmentToReplaceLock,
null,
null,
null
);
}
Expand All @@ -545,14 +547,16 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions
)
{
return commitAppendSegmentsAndMetadataInTransaction(
appendSegments,
appendSegmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
segmentIdToUpgradedVersions
);
}

Expand Down Expand Up @@ -1291,7 +1295,8 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
@Nullable DataSourceMetadata endMetadata,
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions
)
{
verifySegmentsToCommit(appendSegments);
Expand All @@ -1303,7 +1308,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
final String dataSource = appendSegments.iterator().next().getDataSource();
final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction(
(handle, transactionStatus)
-> createNewIdsForAppendSegments(handle, dataSource, appendSegments),
-> createNewIdsForAppendSegments(handle, dataSource, appendSegments, segmentIdToUpgradedVersions),
0,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
Expand Down Expand Up @@ -1436,13 +1441,18 @@ private void insertPendingSegmentIntoMetastore(
private Set<DataSegment> createNewIdsForAppendSegments(
Handle handle,
String dataSource,
Set<DataSegment> segmentsToAppend
Set<DataSegment> segmentsToAppend,
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradeVersions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradeVersions
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions

) throws IOException
{
if (segmentsToAppend.isEmpty()) {
return Collections.emptySet();
}

if (segmentIdToUpgradeVersions == null) {
segmentIdToUpgradeVersions = Collections.emptyMap();
}

final Set<Interval> appendIntervals = new HashSet<>();
final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<>();
for (DataSegment segment : segmentsToAppend) {
Expand Down Expand Up @@ -1475,7 +1485,37 @@ private Set<DataSegment> createNewIdsForAppendSegments(
entry.getValue(),
appendVersionToSegments
);
for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
Map<Interval, Set<DataSegment>> filteredSegmentsToUpgrade = new HashMap<>();
for (Interval interval : segmentsToUpgrade.keySet()) {
filteredSegmentsToUpgrade.put(interval, new HashSet<>());
for (DataSegment segment : segmentsToUpgrade.get(interval)) {
boolean include = true;
if (segmentIdToUpgradeVersions.containsKey(segment.getId().toString())) {
for (SegmentIdWithShardSpec upgradedSegment : segmentIdToUpgradeVersions.get(segment.getId().toString())) {
if (upgradedSegment.getVersion().equals(upgradeVersion)) {
upgradedSegments.add(
new DataSegment(
upgradedSegment.asSegmentId(),
segment.getLoadSpec(),
segment.getDimensions(),
segment.getMetrics(),
upgradedSegment.getShardSpec(),
null,
segment.getBinaryVersion(),
segment.getSize()
)
);
include = false;
break;
}
}
}
if (include) {
filteredSegmentsToUpgrade.get(interval).add(segment);
}
}
}
Comment on lines +1489 to +1517
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please some comments to describe what are you trying to do here? I don't follow.

for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : filteredSegmentsToUpgrade.entrySet()) {
final Interval upgradeInterval = upgradeEntry.getKey();
final Set<DataSegment> segmentsAlreadyOnVersion
= overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet())
Expand Down Expand Up @@ -1548,6 +1588,10 @@ private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
Set<DataSegment> committedSegments
) throws IOException
{
if (CollectionUtils.isNullOrEmpty(segmentsToUpgrade)) {
return Collections.emptySet();
}

// Find the committed segments with the higest partition number
SegmentIdWithShardSpec committedMaxId = null;
for (DataSegment committedSegment : committedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,13 @@ ListenableFuture<SegmentsAndCommitMetadata> publishInBackground(
final Object callerMetadata = metadata == null
? null
: ((AppenderatorDriverMetadata) metadata).getCallerMetadata();
final Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions;
if (appenderator instanceof StreamAppenderator) {
segmentIdToUpgradedVersions = ((StreamAppenderator) appenderator).getBaseSegmentToUpgradedVersions();
} else {
segmentIdToUpgradedVersions = null;
}

return executor.submit(
() -> {
try {
Expand All @@ -625,7 +632,8 @@ ListenableFuture<SegmentsAndCommitMetadata> publishInBackground(
segmentsToBeOverwritten,
ourSegments,
outputSegmentsAnnotateFunction,
callerMetadata
callerMetadata,
segmentIdToUpgradedVersions
);

if (publishResult.isSuccess()) {
Expand Down
Loading