diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 2c6e9ebda0f3..f7dbf9be42cf 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -104,6 +104,10 @@
json-flattener
0.1.0
+
+ com.fasterxml.jackson.core
+ jackson-core
+
com.fasterxml.jackson.dataformat
jackson-dataformat-smile
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
index bde4da9f1311..14c36867e9d9 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
@@ -100,6 +100,7 @@ public void setup()
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
+ .size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
index 2f92f0b85ba4..46be7ff1d571 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
@@ -114,6 +114,7 @@ public void setup()
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
+ .size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index e8212cefa2cd..8766233f3c25 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -199,6 +199,7 @@ public void setup()
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(i))
+ .size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java
index 1f6153d7af24..ec5d378aa2cc 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java
@@ -19,10 +19,11 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
@@ -52,17 +53,12 @@ public Sequence run(
{
return Sequences.map(
baseRunner.run(queryPlus, responseContext),
- new Function()
- {
- @Override
- public T apply(T input)
- {
- try {
- return smileMapper.readValue(smileMapper.writeValueAsBytes(input), clazz);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ input -> {
+ try {
+ return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input), clazz);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
}
}
);
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 667210c28874..cbb339e5a087 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -173,6 +173,7 @@ public void setup()
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
+ .size(0)
.build();
final PlannerConfig plannerConfig = new PlannerConfig();
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index 8021a9909e23..d2787ddd31b2 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -98,6 +98,7 @@ public void setup()
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
+ .size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
index 32d7649d2c05..1075e4c1aba6 100644
--- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
@@ -27,6 +27,7 @@
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
@@ -115,12 +116,7 @@ public T deserialize(byte[] bytes)
return defaultVal;
}
- try {
- return jsonMapper.readValue(bytes, clazz);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return JacksonUtils.readValue(jsonMapper, bytes, clazz);
}
};
}
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
index a0ffe85c67cb..6bef27ee2143 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -67,9 +67,11 @@ public static TaskStatus fromCode(String taskId, TaskState code)
return new TaskStatus(taskId, code, -1, null, null);
}
- // The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
- // The full error message will be available via a TaskReport.
- private static String truncateErrorMsg(String errorMsg)
+ /**
+ * The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
+ * The full error message will be available via a TaskReport.
+ */
+ private static @Nullable String truncateErrorMsg(@Nullable String errorMsg)
{
if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) {
return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "...";
@@ -81,7 +83,7 @@ private static String truncateErrorMsg(String errorMsg)
private final String id;
private final TaskState status;
private final long duration;
- private final String errorMsg;
+ private final @Nullable String errorMsg;
private final TaskLocation location;
@JsonCreator
@@ -89,7 +91,7 @@ protected TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") TaskState status,
@JsonProperty("duration") long duration,
- @JsonProperty("errorMsg") String errorMsg,
+ @JsonProperty("errorMsg") @Nullable String errorMsg,
@Nullable @JsonProperty("location") TaskLocation location
)
{
@@ -122,6 +124,7 @@ public long getDuration()
return duration;
}
+ @Nullable
@JsonProperty("errorMsg")
public String getErrorMsg()
{
diff --git a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
index ee8bf660bd27..eb60d29a6d13 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
@@ -20,10 +20,12 @@
package org.apache.druid.java.util.common.jackson;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.util.Map;
-public class JacksonUtils
+public final class JacksonUtils
{
public static final TypeReference
+
+ org.assertj
+ assertj-core
+ test
+
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a121d6141447..c835bb950385 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -23,26 +23,16 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.FloatDimensionSchema;
-import org.apache.druid.data.input.impl.JSONParseSpec;
-import org.apache.druid.data.input.impl.LongDimensionSchema;
-import org.apache.druid.data.input.impl.StringDimensionSchema;
-import org.apache.druid.data.input.impl.StringInputRowParser;
-import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
@@ -52,9 +42,6 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
-import org.apache.druid.indexing.common.TaskLock;
-import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
@@ -68,41 +55,33 @@
import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.MetadataTaskStorage;
import org.apache.druid.indexing.overlord.TaskLockbox;
-import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTestBase;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
-import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
@@ -115,12 +94,7 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
@@ -133,17 +107,10 @@
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
-import org.apache.druid.segment.DimensionHandlerUtils;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -152,12 +119,9 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CompressionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.easymock.EasyMock;
-import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.AfterClass;
@@ -173,12 +137,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -191,19 +150,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+@SuppressWarnings("unchecked")
@RunWith(Parameterized.class)
-public class KafkaIndexTaskTest
+public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
- private static final ObjectMapper OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
private static final long POLL_RETRY_MS = 100;
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
private static ServiceEmitter emitter;
- private static ListeningExecutorService taskExec;
private static int topicPostfix;
static {
@@ -219,9 +176,6 @@ public static Iterable