Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
}
}

@Override
public boolean supportTransaction()
{
return true;
}

@Override
public boolean isTransactionEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,27 @@ public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception
}

@Override
public boolean isTransactionEnabled()
public boolean supportTransaction()
{
return false;
}

@Override
public boolean isTransactionEnabled()
{
throw new UnsupportedOperationException();
}

@Override
public void initTransaction()
{
// No-Op as Kinesis does not support transaction
throw new UnsupportedOperationException();
}

@Override
public void commitTransaction()
{
// No-Op as Kinesis does not support transaction
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,43 @@
*/
public interface StreamEventWriter extends Closeable
{
/**
* Returns true if the stream supports transactions.
*/
boolean supportTransaction();

/**
* Returns true if the transaction is enabled for this writer. Callers should check {@link #supportTransaction()}
* before calling this method.
*
* @throws UnsupportedOperationException if {@link #supportTransaction()} returns false.
*/
boolean isTransactionEnabled();

/**
* Initializes a transaction for this writer.
*
* @throws UnsupportedOperationException if {@link #supportTransaction()} returns false.
*/
void initTransaction();

/**
* Commits a transaction.
*
* @throws UnsupportedOperationException if {@link #supportTransaction()} returns false.
*/
void commitTransaction();

void write(String topic, byte[] event);

/**
* Flush pending writes on the underlying stream. This method is synchronous and waits until the flush completes.
* Flushes pending writes on the underlying stream. This method is synchronous and waits until the flush completes.
* Note that this method is not interruptible
*/
void flush();

/**
* Close this writer. Any resource should be cleaned up when this method is called.
* Closes this writer. Any resource should be cleaned up when this method is called.
* Implementations must call {@link #flush()} before closing the writer.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot
nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
);

if (streamEventWriter.isTransactionEnabled()) {
if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) {
streamEventWriter.initTransaction();
}

Expand All @@ -98,7 +98,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot
}
}

if (streamEventWriter.isTransactionEnabled()) {
if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) {
streamEventWriter.commitTransaction();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.tests.indexer;

import com.google.common.base.Preconditions;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
Expand All @@ -40,9 +41,9 @@ StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
}

@Override
public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, Boolean transactionEnabled)
{
return new KafkaEventWriter(config, transactionEnabled);
return new KafkaEventWriter(config, Preconditions.checkNotNull(transactionEnabled, "transactionEnabled"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,36 @@
package org.apache.druid.tests.indexer;

import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KinesisAdminClient;
import org.apache.druid.testing.utils.KinesisEventWriter;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;

import javax.annotation.Nullable;
import java.util.function.Function;

public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest
{
private static final Logger LOG = new Logger(AbstractKinesisIndexingServiceTest.class);

@Override
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception
{
return new KinesisAdminClient(config.getStreamEndpoint());
}

@Override
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, @Nullable Boolean transactionEnabled)
throws Exception
{
if (transactionEnabled != null) {
LOG.warn(
"Kinesis event writer doesn't support transaction. Ignoring the given parameter transactionEnabled[%s]",
transactionEnabled
);
}
return new KinesisEventWriter(config.getStreamEndpoint(), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -95,8 +96,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest

abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;

abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
throws Exception;
/**
* Create an event writer for an underlying stream. {@code transactionEnabled} should not be null if the stream
* supports transactions. It is ignored otherwise.
*/
abstract StreamEventWriter createStreamEventWriter(
IntegrationTestingConfig config,
@Nullable Boolean transactionEnabled
) throws Exception;

abstract Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
Expand Down Expand Up @@ -160,7 +167,7 @@ private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
}

protected void doTestIndexDataStableState(
boolean transactionEnabled,
@Nullable Boolean transactionEnabled,
String serializerPath,
String parserType,
String specPath
Expand Down Expand Up @@ -194,7 +201,7 @@ protected void doTestIndexDataStableState(
}
}

void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception
void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartCoordinatorContainer(),
Expand All @@ -203,7 +210,7 @@ void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exc
);
}

void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Exception
void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartIndexerContainer(),
Expand All @@ -212,7 +219,7 @@ void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Except
);
}

void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exception
void doTestIndexDataWithLosingHistorical(@Nullable Boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartHistoricalContainer(),
Expand All @@ -221,7 +228,7 @@ void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exce
);
}

protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled) throws Exception
protected void doTestIndexDataWithStartStopSupervisor(@Nullable Boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
Expand Down Expand Up @@ -284,22 +291,22 @@ protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled
}
}

protected void doTestIndexDataWithStreamReshardSplit(boolean transactionEnabled) throws Exception
protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 2);
}

protected void doTestIndexDataWithStreamReshardMerge(boolean transactionEnabled) throws Exception
protected void doTestIndexDataWithStreamReshardMerge() throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 2);
testIndexWithStreamReshardHelper(null, STREAM_SHARD_COUNT / 2);
}

private void testIndexWithLosingNodeHelper(
Runnable restartRunnable,
Runnable waitForReadyRunnable,
boolean transactionEnabled
@Nullable Boolean transactionEnabled
) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
Expand Down Expand Up @@ -376,7 +383,8 @@ private void testIndexWithLosingNodeHelper(
}
}

private void testIndexWithStreamReshardHelper(boolean transactionEnabled, int newShardCount) throws Exception
private void testIndexWithStreamReshardHelper(@Nullable Boolean transactionEnabled, int newShardCount)
throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void beforeClass() throws Exception
@Test
public void testKinesisIndexDataWithLosingCoordinator() throws Exception
{
doTestIndexDataWithLosingCoordinator(false);
doTestIndexDataWithLosingCoordinator(null);
}

/**
Expand All @@ -56,7 +56,7 @@ public void testKinesisIndexDataWithLosingCoordinator() throws Exception
@Test
public void testKinesisIndexDataWithLosingOverlord() throws Exception
{
doTestIndexDataWithLosingOverlord(false);
doTestIndexDataWithLosingOverlord(null);
}

/**
Expand All @@ -65,6 +65,6 @@ public void testKinesisIndexDataWithLosingOverlord() throws Exception
@Test
public void testKinesisIndexDataWithLosingHistorical() throws Exception
{
doTestIndexDataWithLosingHistorical(false);
doTestIndexDataWithLosingHistorical(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void beforeClass() throws Exception
public void testIndexData(String serializerPath, String parserType, String specPath)
throws Exception
{
doTestIndexDataStableState(false, serializerPath, parserType, specPath);
doTestIndexDataStableState(null, serializerPath, parserType, specPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void beforeClass() throws Exception
@Test
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
{
doTestIndexDataWithStartStopSupervisor(false);
doTestIndexDataWithStartStopSupervisor(null);
}

/**
Expand All @@ -59,7 +59,7 @@ public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
@Test
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit(false);
doTestIndexDataWithStreamReshardSplit(null);
}

/**
Expand All @@ -69,6 +69,6 @@ public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
@Test
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
{
doTestIndexDataWithStreamReshardMerge(false);
doTestIndexDataWithStreamReshardMerge();
}
}