diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index a93fde611c5c..f8d2e10c338b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -1749,7 +1749,7 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTr log.info("Publishing with isTransaction[%s].", useTransaction); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index b7cdd3e96e44..a9dff63aefc7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -518,7 +518,7 @@ public void run() log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9a0ea65175e5..c04d7df646b3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -297,7 +297,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; // Skip connecting firehose if we've been stopped before we got started. diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 6d4cfba4fb85..4b14f68e461f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -949,7 +949,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; try ( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index ae1addfe63e9..8ad4ec28f00a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -382,7 +382,7 @@ private void publish(TaskToolbox toolbox) throws IOException { final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()); final Set segmentsToPublish = segmentsMap @@ -390,7 +390,7 @@ private void publish(TaskToolbox toolbox) throws IOException .stream() .flatMap(report -> report.getSegments().stream()) .collect(Collectors.toSet()); - final boolean published = publisher.publishSegments(segmentsToPublish, null); + final boolean published = publisher.publishSegments(segmentsToPublish, null).isSuccess(); if (published) { log.info("Published segments"); diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 0a7acaea1597..a90ca26bbf5d 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -50,13 +50,14 @@ default List getUsedSegmentsForInterval(String dataSource, Interval /** * Get all used segments and the created_date of these segments in a given datasource and interval - * + * * @param dataSource The datasource to query * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * * @return The DataSegments and the related created_date of segments which include data in the requested interval */ List> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval); - + /** * Get all segments which may include any data in the interval and are flagged as used. * @@ -134,9 +135,12 @@ SegmentIdentifier allocatePendingSegment( * {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not * involve a metadata transaction * - * @return segment publish result indicating transaction success or failure, and set of segments actually published + * @return segment publish result indicating transaction success or failure, and set of segments actually published. + * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure, + * it must throw an exception instead. * * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null + * @throws RuntimeException if the state of metadata storage after this call is unknown */ SegmentPublishResult announceHistoricalSegments( Set segments, @@ -177,7 +181,7 @@ SegmentPublishResult announceHistoricalSegments( * @return true if the entry was inserted, false otherwise */ boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata); - + void updateSegmentMetadata(Set segments); void deleteSegments(Set segments); diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 75e5d9a898f2..82e17a13f4d0 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -311,7 +311,7 @@ public SegmentPublishResult announceHistoricalSegments( } } - final AtomicBoolean txnFailure = new AtomicBoolean(false); + final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); try { return connector.retryTransaction( @@ -323,6 +323,9 @@ public SegmentPublishResult inTransaction( final TransactionStatus transactionStatus ) throws Exception { + // Set definitelyNotUpdated back to false upon retrying. + definitelyNotUpdated.set(false); + final Set inserted = Sets.newHashSet(); if (startMetadata != null) { @@ -334,8 +337,9 @@ public SegmentPublishResult inTransaction( ); if (result != DataSourceMetadataUpdateResult.SUCCESS) { + // Metadata was definitely not updated. transactionStatus.setRollbackOnly(); - txnFailure.set(true); + definitelyNotUpdated.set(true); if (result == DataSourceMetadataUpdateResult.FAILURE) { throw new RuntimeException("Aborting transaction!"); @@ -359,9 +363,10 @@ public SegmentPublishResult inTransaction( ); } catch (CallbackFailedException e) { - if (txnFailure.get()) { - return new SegmentPublishResult(ImmutableSet.of(), false); + if (definitelyNotUpdated.get()) { + return SegmentPublishResult.fail(); } else { + // Must throw exception if we are not sure if we updated or not. throw e; } } @@ -890,7 +895,12 @@ private byte[] getDataSourceMetadataWithHandleAsBytes( * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with * {@link DataSourceMetadata#plus(DataSourceMetadata)} * - * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata + * @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or + * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help + * {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} + * achieve its own guarantee. + * + * @throws RuntimeException if state is unknown after this call */ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( final Handle handle, @@ -1163,29 +1173,31 @@ public List> getUsedSegmentAndCreatedDateForInterval(S handle -> handle.createQuery( StringUtils.format( "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource " + - "AND start >= :start AND %2$send%2$s <= :end AND used = true", + "AND start >= :start AND %2$send%2$s <= :end AND used = true", dbTables.getSegmentsTable(), connector.getQuoteString() ) ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(new ResultSetMapper>() - { - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) throws SQLException - { - try { - return new Pair<>( - jsonMapper.readValue(r.getBytes("payload"), DataSegment.class), - r.getString("created_date")); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }) - .list() + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(new ResultSetMapper>() + { + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return new Pair<>( + jsonMapper.readValue(r.getBytes("payload"), DataSegment.class), + r.getString("created_date") + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }) + .list() ); } @@ -1197,7 +1209,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me .createStatement( StringUtils.format( "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" + - " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", dbTables.getDataSourceTable() ) ) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 5b79626880d2..4bb0857843b4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -691,8 +691,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink try { if (descriptorFile.exists()) { // Already pushed. - log.info("Segment[%s] already pushed.", identifier); - return objectMapper.readValue(descriptorFile, DataSegment.class); + + if (useUniquePath) { + // Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since + // it might serve some unknown purpose. + log.info("Pushing segment[%s] again with new unique path.", identifier); + } else { + log.info("Segment[%s] already pushed.", identifier); + return objectMapper.readValue(descriptorFile, DataSegment.class); + } } log.info("Pushing merged index for segment[%s].", identifier); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index a5a6d2071d29..43ec063c3945 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -555,38 +555,33 @@ ListenableFuture publishInBackground( final boolean published = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() - ); + ).isSuccess(); if (published) { log.info("Published segments."); } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + log.info("Transaction failure while publishing segments, removing them from deep storage " + + "and checking if someone else beat us to publishing."); + + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + final Set segmentsIdentifiers = segmentsAndMetadata .getSegments() .stream() .map(SegmentIdentifier::fromDataSegment) .collect(Collectors.toSet()); + if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers) .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info( - "Removing our segments from deep storage because someone else already published them: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - log.info("Our segments really do exist, awaiting handoff."); } else { - throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments()); + throw new ISE("Failed to publish segments."); } } } catch (Exception e) { - log.warn( - "Removing segments from deep storage after failed publish: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments()); throw Throwables.propagate(e); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index d49b18162118..9a30fbd2fcf5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.appenderator; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher /** * Publish segments, along with some commit metadata, in a single transaction. * - * @return true if segments were published, false if they were not published due to txn failure with the metadata + * @return publish result that indicates if segments were published or not. If it is unclear + * if the segments were published or not, this method must throw an exception. The behavior is similar to + * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments. * * @throws IOException if there was an I/O error when publishing + * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason */ - boolean publishSegments( + SegmentPublishResult publishSegments( Set segments, @Nullable Object commitMetadata ) throws IOException; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 4ac344831768..c472f8016215 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableSet; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; -import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -194,6 +195,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> true; + return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true); } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4d512e110804..1c7abb1f84e3 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception { expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); - expectedException.expectMessage( - "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]"); + expectedException.expectMessage("Failed to publish segments."); testFailDuringPublishInternal(false); } @@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); - - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); + if (!failWithException) { + // Should only kill segments if there was _no_ exception. + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + } EasyMock.replay(dataSegmentKiller); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index acdbed52d694..b9c5e223438f 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -30,6 +30,7 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -53,6 +54,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -359,7 +361,7 @@ private Set asIdentifiers(Iterable segments) static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> true; + return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) @@ -368,7 +370,7 @@ static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithExcept if (failWithException) { throw new RuntimeException("test"); } - return false; + return SegmentPublishResult.fail(); }; }