From ceab5fe6dfdb109ea583e653180e0b0678ce1664 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Dec 2019 17:30:44 -0800 Subject: [PATCH] Using annotation to distinguish Hadoop Configuration in each module (#9013) * Multibinding for NodeRole * Fix endpoints * fix doc * fix test * Using annotation to distinguish Hadoop Configuration in each module --- .../firehose/hdfs/HdfsFirehoseFactory.java | 3 +- .../java/org/apache/druid/guice/Hdfs.java | 40 +++++++++++++++++++ .../inputsource/hdfs/HdfsInputSource.java | 3 +- .../storage/hdfs/HdfsDataSegmentKiller.java | 3 +- .../storage/hdfs/HdfsDataSegmentPuller.java | 3 +- .../storage/hdfs/HdfsDataSegmentPusher.java | 7 +++- .../hdfs/HdfsFileTimestampVersionFinder.java | 3 +- .../hdfs/HdfsStorageAuthentication.java | 3 +- .../storage/hdfs/HdfsStorageDruidModule.java | 4 +- .../storage/hdfs/tasklog/HdfsTaskLogs.java | 3 +- .../data/input/orc/OrcExtensionsModule.java | 3 +- .../druid/data/input/orc/OrcInputFormat.java | 3 +- .../druid/data/input/orc/guice/Orc.java | 40 +++++++++++++++++++ .../parquet/ParquetExtensionsModule.java | 3 +- .../input/parquet/ParquetInputFormat.java | 10 ++++- .../data/input/parquet/ParquetReader.java | 6 ++- .../data/input/parquet/guice/Parquet.java | 40 +++++++++++++++++++ .../input/parquet/BaseParquetReaderTest.java | 3 +- 18 files changed, 163 insertions(+), 17 deletions(-) create mode 100644 extensions-core/hdfs-storage/src/main/java/org/apache/druid/guice/Hdfs.java create mode 100644 extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java create mode 100644 extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index 962da5e5d31d..ee5bf032b4d0 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; +import org.apache.druid.guice.Hdfs; import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; @@ -46,7 +47,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory fullyQualifiedStorageDirectory; @Inject - public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper) + public HdfsDataSegmentPusher( + HdfsDataSegmentPusherConfig config, + @Hdfs Configuration hadoopConfig, + ObjectMapper jsonMapper + ) { this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java index d0530d246ba9..4daa94a6b02d 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import org.apache.druid.data.SearchableVersionedDataFinder; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.RetryUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -36,7 +37,7 @@ public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder { @Inject - public HdfsFileTimestampVersionFinder(Configuration config) + public HdfsFileTimestampVersionFinder(@Hdfs Configuration config) { super(config); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java index 93fbf48efa2f..096875fdaf44 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java @@ -22,6 +22,7 @@ import com.google.common.base.Strings; import com.google.inject.Inject; +import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -40,7 +41,7 @@ public class HdfsStorageAuthentication private final Configuration hadoopConf; @Inject - public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, Configuration hadoopConf) + public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, @Hdfs Configuration hadoopConf) { this.hdfsKerberosConfig = hdfsKerberosConfig; this.hadoopConf = hadoopConf; diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 6e88f8ee38b7..d4242bcddcfc 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -28,6 +28,7 @@ import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory; import org.apache.druid.guice.Binders; +import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; @@ -108,7 +109,7 @@ public void configure(Binder binder) } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf); JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class); Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class); @@ -117,6 +118,5 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class); binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class); LifecycleModule.register(binder, HdfsStorageAuthentication.class); - } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 4e2b68fc67ec..7289901823b6 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -23,6 +23,7 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.inject.Inject; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogs; @@ -51,7 +52,7 @@ public class HdfsTaskLogs implements TaskLogs private final Configuration hadoopConfig; @Inject - public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig) + public HdfsTaskLogs(HdfsTaskLogsConfig config, @Hdfs Configuration hadoopConfig) { this.config = config; this.hadoopConfig = hadoopConfig; diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java index e57995bd26f4..78082cba5a82 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; +import org.apache.druid.data.input.orc.guice.Orc; import org.apache.druid.initialization.DruidModule; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -89,6 +90,6 @@ public void configure(Binder binder) } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Orc.class).toInstance(conf); } } diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java index bf60fb958966..1744fef9f6c2 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.orc.guice.Orc; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.hadoop.conf.Configuration; @@ -42,7 +43,7 @@ public class OrcInputFormat extends NestedInputFormat public OrcInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString, - @JacksonInject Configuration conf + @JacksonInject @Orc Configuration conf ) { super(flattenSpec); diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java new file mode 100644 index 000000000000..ff1be32b6037 --- /dev/null +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the + * same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind + * different instances. This is a binding annotation for druid-orc-extensions extension. Any binding for the same type + * should be distinguished by using this annotation. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Orc +{ +} diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java index dd2cf4c34f82..17fe50d42870 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java @@ -25,6 +25,7 @@ import com.google.inject.Binder; import com.google.inject.Inject; import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser; +import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser; import org.apache.druid.data.input.parquet.simple.ParquetParseSpec; import org.apache.druid.initialization.DruidModule; @@ -98,6 +99,6 @@ public void configure(Binder binder) } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Parquet.class).toInstance(conf); } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index fbd93414a4a1..c49581a9b72a 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -19,13 +19,16 @@ package org.apache.druid.data.input.parquet; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; import java.io.File; @@ -35,15 +38,18 @@ public class ParquetInputFormat extends NestedInputFormat { private final boolean binaryAsString; + private final Configuration conf; @JsonCreator public ParquetInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, - @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString + @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString, + @JacksonInject @Parquet Configuration conf ) { super(flattenSpec); this.binaryAsString = binaryAsString == null ? false : binaryAsString; + this.conf = conf; } @JsonProperty @@ -65,7 +71,7 @@ public InputEntityReader createReader( File temporaryDirectory ) throws IOException { - return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); + return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } @Override diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index a98f929a1c03..2c76253982f2 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.example.GroupReadSupport; @@ -51,6 +52,7 @@ public class ParquetReader extends IntermediateRowParsingReader private final Closer closer; ParquetReader( + Configuration conf, InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory, @@ -69,7 +71,9 @@ public class ParquetReader extends IntermediateRowParsingReader final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build()); + reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path) + .withConf(conf) + .build()); } finally { Thread.currentThread().setContextClassLoader(currentClassLoader); diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java new file mode 100644 index 000000000000..a6f46c6f1ee1 --- /dev/null +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the + * same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind + * different instances. This is a binding annotation for druid-parquet-extensions extension. Any binding for the same + * type should be distinguished by using this annotation. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Parquet +{ +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java index ac22f77f5f49..87f9229b3137 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.FileEntity; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -51,7 +52,7 @@ InputEntityReader createReader( ) throws IOException { FileEntity entity = new FileEntity(new File(parquetFile)); - ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString); + ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString, new Configuration()); return parquet.createReader(schema, entity, null); }