Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.segment.indexing.IngestionSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -456,6 +457,30 @@ protected TaskAction<SegmentPublishResult> buildPublishAction(
}
}

protected TransactionalSegmentPublisher buildSegmentPublisher(TaskToolbox toolbox)
{
return new TransactionalSegmentPublisher()
{
@Override
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
@Nullable Object commitMetadata,
@Nullable SegmentSchemaMapping schemaMapping
) throws IOException
{
return toolbox.getTaskActionClient().submit(
buildPublishAction(
segmentsToBeOverwritten,
segmentsToPublish,
schemaMapping,
getTaskLockHelper().getLockTypeToUse()
)
);
}
};
}

protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// The given intervals are first converted to align with segment granularity. This is because,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -866,11 +865,7 @@ private TaskStatus generateAndPublishSegments(
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
}

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType)
);
final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox);

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
Expand Down Expand Up @@ -1191,12 +1190,7 @@ private void publishSegments(
}
}

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType)
);

final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox);
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, segmentSchemaMapping).isSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ TransactionalSegmentPublisher createPublisher(
}

private class SequenceMetadataTransactionalSegmentPublisher
implements TransactionalSegmentPublisher
extends TransactionalSegmentPublisher
{
private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> runner;
private final TaskToolbox toolbox;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
Expand Down Expand Up @@ -150,11 +149,9 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
);

Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ " ahead of the last committed end state[null]. Try resetting the supervisor."
).toString()
SegmentPublishResult.retryableFailure(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ " ahead of the last committed end state[null]. Try resetting the supervisor."
),
result
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -34,59 +34,59 @@
import java.util.Set;

/**
* Result of an operation that attempts to publish segments. Indicates the set of segments actually published
* and whether or not the transaction was a success.
*
* If "success" is false then the segments set will be empty.
*
* It's possible for the segments set to be empty even if "success" is true, since the segments set only
* includes segments actually published as part of the transaction. The requested segments could have been
* published by a different transaction (e.g. in the case of replica sets) and this one would still succeed.
* Result of a segment publish operation.
*/
public class SegmentPublishResult
{
private final Set<DataSegment> segments;
private final boolean success;
Comment thread
kfaraz marked this conversation as resolved.
@Nullable
private final boolean retryable;
private final String errorMsg;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errorMsg was Nullable before and is not Nullable now. Is this expected ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this particular annotation doesn't do much. Just causes unnecessary warnings in the IDE.

The constructor arg and the getter are still marked as @Nullable.

@Nullable
private final List<PendingSegmentRecord> upgradedPendingSegments;

public static SegmentPublishResult ok(Set<DataSegment> segments)
{
return new SegmentPublishResult(segments, true, null);
return new SegmentPublishResult(segments, true, false, null);
}

public static SegmentPublishResult ok(Set<DataSegment> segments, List<PendingSegmentRecord> upgradedPendingSegments)
{
return new SegmentPublishResult(segments, true, null, upgradedPendingSegments);
return new SegmentPublishResult(segments, true, false, null, upgradedPendingSegments);
}

public static SegmentPublishResult fail(String errorMsg)
public static SegmentPublishResult fail(String errorMsg, Object... args)
{
return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
return new SegmentPublishResult(Set.of(), false, false, StringUtils.format(errorMsg, args), null);
}

public static SegmentPublishResult retryableFailure(String errorMsg, Object... args)
{
return new SegmentPublishResult(Set.of(), false, true, StringUtils.format(errorMsg, args), null);
}

@JsonCreator
private SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("success") boolean success,
@JsonProperty("retryable") boolean retryable,
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
this(segments, success, errorMsg, null);
this(segments, success, retryable, errorMsg, null);
}

private SegmentPublishResult(
Set<DataSegment> segments,
boolean success,
@Nullable String errorMsg,
boolean retryable,
@Nullable String errorMsg,
List<PendingSegmentRecord> upgradedPendingSegments
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.success = success;
this.errorMsg = errorMsg;
this.retryable = retryable;
this.upgradedPendingSegments = upgradedPendingSegments;

if (!success) {
Expand All @@ -98,6 +98,12 @@ private SegmentPublishResult(
}
}

/**
* Set of segments published successfully.
*
* @return Empty set if the publish operation failed or if all the segments had
* already been published by a different transaction.
*/
@JsonProperty
public Set<DataSegment> getSegments()
{
Expand All @@ -117,6 +123,12 @@ public String getErrorMsg()
return errorMsg;
}

@JsonProperty
public boolean isRetryable()
{
return retryable;
}

@Nullable
public List<PendingSegmentRecord> getUpgradedPendingSegments()
{
Expand All @@ -134,14 +146,15 @@ public boolean equals(Object o)
}
SegmentPublishResult that = (SegmentPublishResult) o;
return success == that.success &&
retryable == that.retryable &&
Objects.equals(segments, that.segments) &&
Objects.equals(errorMsg, that.errorMsg);
}

@Override
public int hashCode()
{
return Objects.hash(segments, success, errorMsg);
return Objects.hash(segments, success, errorMsg, retryable);
}

@Override
Expand All @@ -150,6 +163,7 @@ public String toString()
return "SegmentPublishResult{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", success=" + success +
", retryable=" + retryable +
", errorMsg='" + errorMsg + '\'' +
'}';
}
Expand Down
Loading
Loading