From 886b7396236ecbe6ff6ea7dcc58f5e3ff24809d2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 29 Apr 2020 14:19:28 -0700 Subject: [PATCH] Add javadoc for stream ingestion integration tests --- .../druid/testing/utils/KafkaEventWriter.java | 6 ++++ .../testing/utils/KinesisEventWriter.java | 12 +++++-- .../testing/utils/StreamEventWriter.java | 25 +++++++++++++-- .../utils/SyntheticStreamGenerator.java | 4 +-- .../AbstractKafkaIndexingServiceTest.java | 5 +-- .../AbstractKinesisIndexingServiceTest.java | 12 ++++++- .../indexer/AbstractStreamIndexingTest.java | 32 ++++++++++++------- ...TKinesisIndexingServiceSerializedTest.java | 6 ++-- ...TKinesisIndexingServiceDataFormatTest.java | 2 +- ...inesisIndexingServiceParallelizedTest.java | 6 ++-- 10 files changed, 81 insertions(+), 29 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java index 14b57141dd56..a9d6da9419c2 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java @@ -65,6 +65,12 @@ public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) } } + @Override + public boolean supportTransaction() + { + return true; + } + @Override public boolean isTransactionEnabled() { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java index 39b700ce093d..f0c66fcb8b9a 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java @@ -60,21 +60,27 @@ public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception } @Override - public boolean isTransactionEnabled() + public boolean supportTransaction() { return false; } + @Override + public boolean isTransactionEnabled() + { + throw new UnsupportedOperationException(); + } + @Override public void initTransaction() { - // No-Op as Kinesis does not support transaction + throw new UnsupportedOperationException(); } @Override public void commitTransaction() { - // No-Op as Kinesis does not support transaction + throw new UnsupportedOperationException(); } @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java index 747cbd8c2ac9..2f6e06562f90 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java @@ -28,22 +28,43 @@ */ public interface StreamEventWriter extends Closeable { + /** + * Returns true if the stream supports transactions. + */ + boolean supportTransaction(); + + /** + * Returns true if the transaction is enabled for this writer. Callers should check {@link #supportTransaction()} + * before calling this method. + * + * @throws UnsupportedOperationException if {@link #supportTransaction()} returns false. + */ boolean isTransactionEnabled(); + /** + * Initializes a transaction for this writer. + * + * @throws UnsupportedOperationException if {@link #supportTransaction()} returns false. + */ void initTransaction(); + /** + * Commits a transaction. + * + * @throws UnsupportedOperationException if {@link #supportTransaction()} returns false. + */ void commitTransaction(); void write(String topic, byte[] event); /** - * Flush pending writes on the underlying stream. This method is synchronous and waits until the flush completes. + * Flushes pending writes on the underlying stream. This method is synchronous and waits until the flush completes. * Note that this method is not interruptible */ void flush(); /** - * Close this writer. Any resource should be cleaned up when this method is called. + * Closes this writer. Any resource should be cleaned up when this method is called. * Implementations must call {@link #flush()} before closing the writer. */ @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java index c68db10fb043..655a95cdc834 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java @@ -81,7 +81,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs) ); - if (streamEventWriter.isTransactionEnabled()) { + if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) { streamEventWriter.initTransaction(); } @@ -98,7 +98,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot } } - if (streamEventWriter.isTransactionEnabled()) { + if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) { streamEventWriter.commitTransaction(); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 3bb9693dcbdf..9d1a69bec236 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.tests.indexer; +import com.google.common.base.Preconditions; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.IntegrationTestingConfig; @@ -40,9 +41,9 @@ StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) } @Override - public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) + public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, Boolean transactionEnabled) { - return new KafkaEventWriter(config, transactionEnabled); + return new KafkaEventWriter(config, Preconditions.checkNotNull(transactionEnabled, "transactionEnabled")); } @Override diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java index b8095a310b44..25dd871c673a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java @@ -20,16 +20,20 @@ package org.apache.druid.tests.indexer; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.utils.KinesisAdminClient; import org.apache.druid.testing.utils.KinesisEventWriter; import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; +import javax.annotation.Nullable; import java.util.function.Function; public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest { + private static final Logger LOG = new Logger(AbstractKinesisIndexingServiceTest.class); + @Override StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception { @@ -37,9 +41,15 @@ StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throw } @Override - StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) + StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, @Nullable Boolean transactionEnabled) throws Exception { + if (transactionEnabled != null) { + LOG.warn( + "Kinesis event writer doesn't support transaction. Ignoring the given parameter transactionEnabled[%s]", + transactionEnabled + ); + } return new KinesisEventWriter(config.getStreamEndpoint(), false); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 506e80fe1ec2..9a1dbb32e62c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -40,6 +40,7 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; @@ -95,8 +96,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception; - abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) - throws Exception; + /** + * Create an event writer for an underlying stream. {@code transactionEnabled} should not be null if the stream + * supports transactions. It is ignored otherwise. + */ + abstract StreamEventWriter createStreamEventWriter( + IntegrationTestingConfig config, + @Nullable Boolean transactionEnabled + ) throws Exception; abstract Function generateStreamIngestionPropsTransform( String streamName, @@ -160,7 +167,7 @@ private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig) } protected void doTestIndexDataStableState( - boolean transactionEnabled, + @Nullable Boolean transactionEnabled, String serializerPath, String parserType, String specPath @@ -194,7 +201,7 @@ protected void doTestIndexDataStableState( } } - void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception + void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception { testIndexWithLosingNodeHelper( () -> druidClusterAdminClient.restartCoordinatorContainer(), @@ -203,7 +210,7 @@ void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exc ); } - void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Exception + void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) throws Exception { testIndexWithLosingNodeHelper( () -> druidClusterAdminClient.restartIndexerContainer(), @@ -212,7 +219,7 @@ void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Except ); } - void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exception + void doTestIndexDataWithLosingHistorical(@Nullable Boolean transactionEnabled) throws Exception { testIndexWithLosingNodeHelper( () -> druidClusterAdminClient.restartHistoricalContainer(), @@ -221,7 +228,7 @@ void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exce ); } - protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled) throws Exception + protected void doTestIndexDataWithStartStopSupervisor(@Nullable Boolean transactionEnabled) throws Exception { final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( INPUT_FORMAT, @@ -284,22 +291,22 @@ protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled } } - protected void doTestIndexDataWithStreamReshardSplit(boolean transactionEnabled) throws Exception + protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception { // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2 testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 2); } - protected void doTestIndexDataWithStreamReshardMerge(boolean transactionEnabled) throws Exception + protected void doTestIndexDataWithStreamReshardMerge() throws Exception { // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2 - testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 2); + testIndexWithStreamReshardHelper(null, STREAM_SHARD_COUNT / 2); } private void testIndexWithLosingNodeHelper( Runnable restartRunnable, Runnable waitForReadyRunnable, - boolean transactionEnabled + @Nullable Boolean transactionEnabled ) throws Exception { final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( @@ -376,7 +383,8 @@ private void testIndexWithLosingNodeHelper( } } - private void testIndexWithStreamReshardHelper(boolean transactionEnabled, int newShardCount) throws Exception + private void testIndexWithStreamReshardHelper(@Nullable Boolean transactionEnabled, int newShardCount) + throws Exception { final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( INPUT_FORMAT, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java index fed13619dc06..02e9b6d3ea96 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java @@ -47,7 +47,7 @@ public void beforeClass() throws Exception @Test public void testKinesisIndexDataWithLosingCoordinator() throws Exception { - doTestIndexDataWithLosingCoordinator(false); + doTestIndexDataWithLosingCoordinator(null); } /** @@ -56,7 +56,7 @@ public void testKinesisIndexDataWithLosingCoordinator() throws Exception @Test public void testKinesisIndexDataWithLosingOverlord() throws Exception { - doTestIndexDataWithLosingOverlord(false); + doTestIndexDataWithLosingOverlord(null); } /** @@ -65,6 +65,6 @@ public void testKinesisIndexDataWithLosingOverlord() throws Exception @Test public void testKinesisIndexDataWithLosingHistorical() throws Exception { - doTestIndexDataWithLosingHistorical(false); + doTestIndexDataWithLosingHistorical(null); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java index c302cd1a9224..47ffffd80ffa 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java @@ -85,7 +85,7 @@ public void beforeClass() throws Exception public void testIndexData(String serializerPath, String parserType, String specPath) throws Exception { - doTestIndexDataStableState(false, serializerPath, parserType, specPath); + doTestIndexDataStableState(null, serializerPath, parserType, specPath); } @Override diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java index efd107fa1aa2..968f794fb01a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java @@ -49,7 +49,7 @@ public void beforeClass() throws Exception @Test public void testKinesisIndexDataWithStartStopSupervisor() throws Exception { - doTestIndexDataWithStartStopSupervisor(false); + doTestIndexDataWithStartStopSupervisor(null); } /** @@ -59,7 +59,7 @@ public void testKinesisIndexDataWithStartStopSupervisor() throws Exception @Test public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception { - doTestIndexDataWithStreamReshardSplit(false); + doTestIndexDataWithStreamReshardSplit(null); } /** @@ -69,6 +69,6 @@ public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception @Test public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception { - doTestIndexDataWithStreamReshardMerge(false); + doTestIndexDataWithStreamReshardMerge(); } }