Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,31 @@ private TaskResponseObject(
this.status = status;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public String getId()
{
return id;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public String getType()
{
return type;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public TaskState getStatus()
{
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;

Expand Down Expand Up @@ -62,6 +64,10 @@ public abstract class AbstractIndexerTest
protected ObjectMapper smileMapper;
@Inject
protected TestQueryHelper queryHelper;
@Inject
protected SqlTestQueryHelper sqlQueryHelper;
@Inject
protected DataLoaderHelper dataLoaderHelper;

@Inject
protected IntegrationTestingConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
Expand All @@ -53,6 +54,7 @@ Function<String, String> generateStreamIngestionPropsTransform(
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
)
{
Expand Down Expand Up @@ -117,13 +119,16 @@ Function<String, String> generateStreamIngestionPropsTransform(
"%%STREAM_PROPERTIES_KEY%%",
"consumerProperties"
);

spec = StringUtils.replace(
spec,
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
);

spec = StringUtils.replace(
spec,
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.testing.utils.StreamEventWriter;

import javax.annotation.Nullable;
import java.util.List;
import java.util.function.Function;

public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest
Expand Down Expand Up @@ -59,6 +60,7 @@ Function<String, String> generateStreamIngestionPropsTransform(
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
)
{
Expand Down Expand Up @@ -122,6 +124,11 @@ Function<String, String> generateStreamIngestionPropsTransform(
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
);
spec = StringUtils.replace(
spec,
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.tests.indexer;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
Expand Down Expand Up @@ -73,7 +74,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
private static final long CYCLE_PADDING_MS = 100;
protected static final long CYCLE_PADDING_MS = 100;

private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
Expand All @@ -93,9 +94,24 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
protected static final String INPUT_FORMAT = "inputFormat";
protected static final String INPUT_ROW_PARSER = "parser";

private static final String JSON_INPUT_FORMAT_PATH =
protected static final String JSON_INPUT_FORMAT_PATH =
String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json");

protected static final List<String> DEFAULT_DIMENSIONS = ImmutableList.of(
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
);

@Inject
private DruidClusterAdminClient druidClusterAdminClient;

Expand All @@ -117,6 +133,7 @@ abstract Function<String, String> generateStreamIngestionPropsTransform(
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
);

Expand Down Expand Up @@ -625,7 +642,7 @@ private void testIndexWithStreamReshardHelper(@Nullable Boolean transactionEnabl
}
}

private void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception
protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception
{
// Wait for supervisor to consume events
LOG.info("Waiting for stream indexing tasks to consume events");
Expand Down Expand Up @@ -720,6 +737,11 @@ protected class GeneratedTestConfig
private Function<String, String> streamQueryPropsTransform;

public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
{
this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS);
}

public GeneratedTestConfig(String parserType, String parserOrInputFormat, List<String> dimensions) throws Exception
{
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
Expand All @@ -741,6 +763,7 @@ public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws
fullDatasourceName,
parserType,
parserOrInputFormat,
dimensions,
config
);
streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);
Expand Down
Loading