Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTr

log.info("Publishing with isTransaction[%s].", useTransaction);

return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void run()

log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());

return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};

// Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null

final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};

// Skip connecting firehose if we've been stopped before we got started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null

final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};

try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,15 @@ private void publish(TaskToolbox toolbox) throws IOException
{
final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};
final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
final Set<DataSegment> segmentsToPublish = segmentsMap
.values()
.stream()
.flatMap(report -> report.getSegments().stream())
.collect(Collectors.toSet());
final boolean published = publisher.publishSegments(segmentsToPublish, null);
final boolean published = publisher.publishSegments(segmentsToPublish, null).isSuccess();

if (published) {
log.info("Published segments");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ default List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval

/**
* Get all used segments and the created_date of these segments in a given datasource and interval
*
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
*
* @return The DataSegments and the related created_date of segments which include data in the requested interval
*/
List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval);

/**
* Get all segments which may include any data in the interval and are flagged as used.
*
Expand Down Expand Up @@ -134,9 +135,12 @@ SegmentIdentifier allocatePendingSegment(
* {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not
* involve a metadata transaction
*
* @return segment publish result indicating transaction success or failure, and set of segments actually published
* @return segment publish result indicating transaction success or failure, and set of segments actually published.
* This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure,
* it must throw an exception instead.
*
* @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
* @throws RuntimeException if the state of metadata storage after this call is unknown
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Expand Down Expand Up @@ -177,7 +181,7 @@ SegmentPublishResult announceHistoricalSegments(
* @return true if the entry was inserted, false otherwise
*/
boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata);

void updateSegmentMetadata(Set<DataSegment> segments);

void deleteSegments(Set<DataSegment> segments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public SegmentPublishResult announceHistoricalSegments(
}
}

final AtomicBoolean txnFailure = new AtomicBoolean(false);
final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);

try {
return connector.retryTransaction(
Expand All @@ -323,6 +323,9 @@ public SegmentPublishResult inTransaction(
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);

final Set<DataSegment> inserted = Sets.newHashSet();

if (startMetadata != null) {
Expand All @@ -334,8 +337,9 @@ public SegmentPublishResult inTransaction(
);

if (result != DataSourceMetadataUpdateResult.SUCCESS) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
txnFailure.set(true);
definitelyNotUpdated.set(true);

if (result == DataSourceMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
Expand All @@ -359,9 +363,10 @@ public SegmentPublishResult inTransaction(
);
}
catch (CallbackFailedException e) {
if (txnFailure.get()) {
return new SegmentPublishResult(ImmutableSet.of(), false);
if (definitelyNotUpdated.get()) {
return SegmentPublishResult.fail();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding an exception to SegmentPublishResult on failure, so that callers can figure out why it failed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary, since there is supposed to be only one reason: compare-and-swap failure with the metadata update.

} else {
// Must throw exception if we are not sure if we updated or not.
throw e;
}
}
Expand Down Expand Up @@ -890,7 +895,12 @@ private byte[] getDataSourceMetadataWithHandleAsBytes(
* @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}
*
* @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
* @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
* TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
* {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* achieve its own guarantee.
*
* @throws RuntimeException if state is unknown after this call
*/
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
final Handle handle,
Expand Down Expand Up @@ -1163,29 +1173,31 @@ public List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(S
handle -> handle.createQuery(
StringUtils.format(
"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource " +
"AND start >= :start AND %2$send%2$s <= :end AND used = true",
"AND start >= :start AND %2$send%2$s <= :end AND used = true",
dbTables.getSegmentsTable(), connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(new ResultSetMapper<Pair<DataSegment, String>>()
{
@Override
public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
return new Pair<>(
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class),
r.getString("created_date"));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
})
.list()
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(new ResultSetMapper<Pair<DataSegment, String>>()
{
@Override
public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return new Pair<>(
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class),
r.getString("created_date")
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
})
.list()
);
}

Expand All @@ -1197,7 +1209,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me
.createStatement(
StringUtils.format(
"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" +
" (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
" (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
dbTables.getDataSourceTable()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
try {
if (descriptorFile.exists()) {
// Already pushed.
log.info("Segment[%s] already pushed.", identifier);
return objectMapper.readValue(descriptorFile, DataSegment.class);

if (useUniquePath) {
// Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since
// it might serve some unknown purpose.
log.info("Pushing segment[%s] again with new unique path.", identifier);
} else {
log.info("Segment[%s] already pushed.", identifier);
return objectMapper.readValue(descriptorFile, DataSegment.class);
}
}

log.info("Pushing merged index for segment[%s].", identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,38 +555,33 @@ ListenableFuture<SegmentsAndMetadata> publishInBackground(
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
).isSuccess();

if (published) {
log.info("Published segments.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
log.info("Transaction failure while publishing segments, removing them from deep storage "
+ "and checking if someone else beat us to publishing.");

segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
.getSegments()
.stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());

if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info(
"Removing our segments from deep storage because someone else already published them: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

log.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
throw new ISE("Failed to publish segments.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change for removing too large logs? I feel sometimes this log helps..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it's not necessary, since they will get logged in the catch statement via:

              log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments());

}
}
}
catch (Exception e) {
log.warn(
"Removing segments from deep storage after failed publish: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

// Must not remove segments here, we aren't sure if our transaction succeeded or not.
log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments());
throw Throwables.propagate(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.segment.realtime.appenderator;

import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand All @@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher
/**
* Publish segments, along with some commit metadata, in a single transaction.
*
* @return true if segments were published, false if they were not published due to txn failure with the metadata
* @return publish result that indicates if segments were published or not. If it is unclear
* if the segments were published or not, this method must throw an exception. The behavior is similar to
* IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
*
* @throws IOException if there was an I/O error when publishing
* @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason
*/
boolean publishSegments(
SegmentPublishResult publishSegments(
Set<DataSegment> segments,
@Nullable Object commitMetadata
) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
Expand Down Expand Up @@ -194,6 +195,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp

static TransactionalSegmentPublisher makeOkPublisher()
{
return (segments, commitMetadata) -> true;
return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
"Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]");
expectedException.expectMessage("Failed to publish segments.");

testFailDuringPublishInternal(false);
}
Expand Down Expand Up @@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
if (!failWithException) {
// Should only kill segments if there was _no_ exception.
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
}

EasyMock.replay(dataSegmentKiller);

Expand Down
Loading