diff --git a/core/src/test/java/org/apache/druid/data/input/impl/ParseSpecTest.java b/core/src/test/java/org/apache/druid/data/input/impl/ParseSpecTest.java index 848174da842a..eb321cefd59c 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/ParseSpecTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/ParseSpecTest.java @@ -206,7 +206,7 @@ public void testBadTypeSerde() throws IOException expectedException.expect(IllegalArgumentException.class); expectedException.expectCause(CoreMatchers.instanceOf(JsonMappingException.class)); - expectedException.expectMessage("Could not resolve type id 'foo' as a subtype"); + expectedException.expectMessage("Could not resolve type id 'foo' as a subtype of [simple type, class org.apache.druid.data.input.impl.ParseSpec]: known type ids = [csv, javascript, json, jsonLowercase, regex, timeAndDims, tsv]"); mapper.convertValue(mapValue, ParseSpec.class); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index 81ea6de396b4..e7809619a805 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -29,6 +29,14 @@ public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata { + @JsonCreator + public KafkaDataSourceMetadata( + @JsonProperty("partitions") SeekableStreamEndSequenceNumbers kafkaPartitions + ) + { + super(kafkaPartitions); + } + @JsonCreator public KafkaDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers kafkaPartitions diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java index 4de3c06e78ae..15a6f93176e5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Optional; @@ -51,7 +50,7 @@ public class KafkaIOConfigTest public KafkaIOConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } @Rule diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java index d3c1630cf909..f2a4b841289f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java @@ -28,6 +28,14 @@ public class KinesisDataSourceMetadata extends SeekableStreamDataSourceMetadata { + @JsonCreator + public KinesisDataSourceMetadata( + @JsonProperty("partitions") SeekableStreamEndSequenceNumbers kinesisPartitions + ) + { + super(kinesisPartitions); + } + @JsonCreator public KinesisDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers kinesisPartitions diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 8c8373440213..d791a3f6e622 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -114,13 +114,13 @@ public static List getPublishedSegments(HadoopDruidIndexerConfig co final Path descriptorInfoDir = config.makeDescriptorInfoDir(); - try { - FileSystem fs = descriptorInfoDir.getFileSystem(conf); - + try (FileSystem fs = descriptorInfoDir.getFileSystem(conf)) { for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class); - publishedSegmentsBuilder.add(segment); - log.info("Adding segment %s to the list of published segments", segment.getId()); + try (final InputStream input = fs.open(status.getPath())) { + final DataSegment segment = jsonMapper.readValue(input, DataSegment.class); + publishedSegmentsBuilder.add(segment); + log.info("Adding segment %s to the list of published segments", segment.getId()); + } } } catch (FileNotFoundException e) { @@ -134,9 +134,8 @@ public static List getPublishedSegments(HadoopDruidIndexerConfig co catch (IOException e) { throw new RuntimeException(e); } - List publishedSegments = publishedSegmentsBuilder.build(); - return publishedSegments; + return publishedSegmentsBuilder.build(); } private final HadoopDruidIndexerConfig config; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java index a790974e25f1..3d28bad049c6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java @@ -19,19 +19,10 @@ package org.apache.druid.indexing.seekablestream; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonSubTypes.Type; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.indexing.overlord.DataSourceMetadata; import java.util.Map; -@JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class) -@JsonSubTypes({ - @Type(name = "start", value = SeekableStreamStartSequenceNumbers.class), - @Type(name = "end", value = SeekableStreamEndSequenceNumbers.class) -}) public interface SeekableStreamSequenceNumbers { /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 356223aacf33..063455b66237 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -105,6 +105,8 @@ default Collection getUsedSegmentsForInterval(String dataSource, In * @param segments set of segments to add * * @return set of segments actually added + * + * @throws IOException */ Set announceHistoricalSegments(Set segments) throws IOException; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java index c4229a028806..e055bc0957df 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordination; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonUnwrapped; import org.apache.druid.java.util.common.StringUtils; @@ -29,22 +28,24 @@ import java.util.Objects; + /** */ public class SegmentChangeRequestDrop implements DataSegmentChangeRequest { - private final DataSegment segment; + @JsonUnwrapped + private DataSegment segment; + + public SegmentChangeRequestDrop() + { + } - @JsonCreator - public SegmentChangeRequestDrop( - @JsonUnwrapped DataSegment segment - ) + public SegmentChangeRequestDrop(DataSegment segment) { this.segment = segment; } @JsonProperty - @JsonUnwrapped public DataSegment getSegment() { return segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index 097e02523032..17251c5ca927 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordination; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonUnwrapped; import org.apache.druid.java.util.common.StringUtils; @@ -33,12 +32,14 @@ */ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest { - private final DataSegment segment; + @JsonUnwrapped + private DataSegment segment; - @JsonCreator - public SegmentChangeRequestLoad( - @JsonUnwrapped DataSegment segment - ) + public SegmentChangeRequestLoad() + { + } + + public SegmentChangeRequestLoad(DataSegment segment) { this.segment = segment; } @@ -50,7 +51,6 @@ public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCall } @JsonProperty - @JsonUnwrapped public DataSegment getSegment() { return segment;