Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
} else {
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
/**
* Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments},
* are inserted into metadata storage.
*/
@Nullable
private final Set<DataSegment> segmentsToBeDropped;

@Nullable
private final DataSourceMetadata startMetadata;
Expand All @@ -82,11 +76,10 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli

public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
@Nullable Set<DataSegment> segmentsToBeDropped,
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null);
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null);
}

public static SegmentTransactionalInsertAction appendAction(
Expand All @@ -95,7 +88,7 @@ public static SegmentTransactionalInsertAction appendAction(
@Nullable DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null);
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null);
}

public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
Expand All @@ -104,21 +97,19 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource);
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource);
}

@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> segmentsToBeOverwritten,
@JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> segmentsToBeDropped,
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
this.segmentsToBeDropped = segmentsToBeDropped;
this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
Expand All @@ -132,13 +123,6 @@ public Set<DataSegment> getSegmentsToBeOverwritten()
return segmentsToBeOverwritten;
}

@JsonProperty
@Nullable
public Set<DataSegment> getSegmentsToBeDropped()
{
return segmentsToBeDropped;
}

@JsonProperty
public Set<DataSegment> getSegments()
{
Expand Down Expand Up @@ -202,9 +186,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
}
if (segmentsToBeDropped != null) {
allSegments.addAll(segmentsToBeDropped);
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments);

Expand All @@ -224,7 +205,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
segmentsToBeDropped,
startMetadata,
endMetadata
)
Expand Down Expand Up @@ -359,7 +339,6 @@ public String toString()
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource='" + dataSource + '\'' +
", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,19 +350,13 @@ public TaskStatus runTask(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> {
final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
segments,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,9 @@ private TaskStatus generateAndPublishSegments(
}


final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish));
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down Expand Up @@ -996,7 +996,6 @@ private TaskStatus generateAndPublishSegments(
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(
inputSegments,
null,
tombStones,
publisher,
annotateFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,16 +1170,13 @@ private void publishSegments(
}
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish)
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)
);
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments,
Collections.emptySet(),
newSegments, annotateFunction,
null).isSuccess();
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();

if (published) {
LOG.info("Published [%d] segments", newSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ public SequenceMetadataTransactionalSegmentPublisher(
@Override
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> mustBeNullOrEmptyOverwriteSegments,
@Nullable Set<DataSegment> mustBeNullOrEmptyDropSegments,
Set<DataSegment> segmentsToPush,
@Nullable Object commitMetadata
) throws IOException
Expand All @@ -348,13 +347,7 @@ public SegmentPublishResult publishAnnotatedSegments(
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final Map<?, ?> commitMetaMap = (Map<?, ?>) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> finalPartitions =
runner.deserializePartitionsFromMetadata(
toolbox.getJsonMapper(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,39 +134,6 @@ public void testTransactionalUpdateDataSourceMetadata() throws Exception
);
}

@Test
public void testTransactionalDropSegments() throws Exception
{
final Task task = NoopTask.create();
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

SegmentPublishResult result1 = SegmentTransactionalInsertAction.overwriteAction(
null,
null,
ImmutableSet.of(SEGMENT1)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1);

SegmentPublishResult result2 = SegmentTransactionalInsertAction.overwriteAction(
null,
ImmutableSet.of(SEGMENT1),
ImmutableSet.of(SEGMENT2)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2);

Assertions.assertThat(
actionTestKit.getMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(SEGMENT2);
}

@Test
public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
{
Expand All @@ -193,38 +160,11 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
);
}

@Test
public void testFailTransactionalDropSegment() throws Exception
{
final Task task = NoopTask.create();
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

SegmentPublishResult result = SegmentTransactionalInsertAction.overwriteAction(
null,
// SEGMENT1 does not exist, hence will fail to drop
ImmutableSet.of(SEGMENT1),
ImmutableSet.of(SEGMENT2)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);

Assert.assertEquals(
SegmentPublishResult.fail(
"org.apache.druid.metadata.RetryTransactionException: " +
"Failed to drop some segments. Only 0 could be dropped out of 1. Trying again"
),
result
);
}

@Test
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction(
null,
null,
ImmutableSet.of(SEGMENT3)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,16 +1525,15 @@ public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) th
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException
{
SegmentPublishResult result = super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata, endMetadata);
SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata);

Assert.assertFalse(
Assert.assertNotNull(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
segmentLatch == null
segmentLatch
);

publishedSegments.addAll(result.getSegments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,7 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull
"Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
);

transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, null, ImmutableSet.of(), null);
}

@Test
public void testPublishAnnotatedSegmentsThrowExceptionIfDropSegmentsNotNullAndNotEmpty() throws Exception
{
DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2001/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build();

Set<DataSegment> notNullNotEmptySegment = ImmutableSet.of(dataSegment);
SequenceMetadata<Integer, Integer> sequenceMetadata = new SequenceMetadata<>(
1,
"test",
ImmutableMap.of(),
ImmutableMap.of(),
true,
ImmutableSet.of()
);
TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true);

expectedException.expect(ISE.class);
expectedException.expectMessage(
"Stream ingestion task unexpectedly attempted to drop segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
);

transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableSet.of(), null);
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null);
}

@Test
Expand Down Expand Up @@ -143,6 +113,6 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment
);
TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false);

transactionalSegmentPublisher.publishAnnotatedSegments(null, null, notNullNotEmptySegment, ImmutableMap.of());
transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Commit metadata for a dataSource. Used by
* {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, Set, DataSourceMetadata, DataSourceMetadata)}
* {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* to provide metadata transactions for segment inserts.
*
* Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ SegmentIdWithShardSpec allocatePendingSegment(
* {@param segments} and dropping {@param segmentsToDrop}
*
* @param segments set of segments to add, must all be from the same dataSource
* @param segmentsToDrop set of segments to drop, must all be from the same dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
* not involve a metadata transaction
Expand All @@ -274,7 +273,6 @@ SegmentIdWithShardSpec allocatePendingSegment(
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
) throws IOException;
Expand Down
Loading