Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
Expand All @@ -46,7 +47,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa

@JsonCreator
public HdfsFirehoseFactory(
@JacksonInject Configuration conf,
@JacksonInject @Hdfs Configuration conf,
@JsonProperty("paths") Object inputPaths,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
* different instances. This is a binding annotation for druid-hdfs-storage extension. Any binding for the same type
* should be distinguished by using this annotation.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Hdfs
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IAE;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
@JsonCreator
public HdfsInputSource(
@JsonProperty(PROP_PATHS) Object inputPaths,
@JacksonInject Configuration configuration
@JacksonInject @Hdfs Configuration configuration
)
{
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
Expand All @@ -43,7 +44,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
private final Path storageDirectory;

@Inject
public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
public HdfsDataSegmentKiller(@Hdfs final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
{
this.config = config;
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RetryUtils;
Expand Down Expand Up @@ -178,7 +179,7 @@ public boolean delete()
protected final Configuration config;

@Inject
public HdfsDataSegmentPuller(final Configuration config)
public HdfsDataSegmentPuller(@Hdfs final Configuration config)
{
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -60,7 +61,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final Supplier<String> fullyQualifiedStorageDirectory;

@Inject
public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper)
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
@Hdfs Configuration hadoopConfig,
ObjectMapper jsonMapper
)
{
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.inject.Inject;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -36,7 +37,7 @@
public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder<URI>
{
@Inject
public HdfsFileTimestampVersionFinder(Configuration config)
public HdfsFileTimestampVersionFinder(@Hdfs Configuration config)
{
super(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
Expand All @@ -40,7 +41,7 @@ public class HdfsStorageAuthentication
private final Configuration hadoopConf;

@Inject
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, Configuration hadoopConf)
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, @Hdfs Configuration hadoopConf)
{
this.hdfsKerberosConfig = hdfsKerberosConfig;
this.hadoopConf = hadoopConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);

Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
Expand All @@ -117,6 +118,5 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, HdfsStorageAuthentication.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.tasklogs.TaskLogs;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class HdfsTaskLogs implements TaskLogs
private final Configuration hadoopConfig;

@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig)
public HdfsTaskLogs(HdfsTaskLogsConfig config, @Hdfs Configuration hadoopConfig)
{
this.config = config;
this.hadoopConfig = hadoopConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.data.input.orc.guice.Orc;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -89,6 +90,6 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Orc.class).toInstance(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.orc.guice.Orc;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -42,7 +43,7 @@ public class OrcInputFormat extends NestedInputFormat
public OrcInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
@JacksonInject Configuration conf
@JacksonInject @Orc Configuration conf
)
{
super(flattenSpec);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.orc.guice;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
* different instances. This is a binding annotation for druid-orc-extensions extension. Any binding for the same type
* should be distinguished by using this annotation.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Orc
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
import org.apache.druid.data.input.parquet.guice.Parquet;
import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
import org.apache.druid.initialization.DruidModule;
Expand Down Expand Up @@ -98,6 +99,6 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Parquet.class).toInstance(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.apache.druid.data.input.parquet;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.parquet.guice.Parquet;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;

import javax.annotation.Nullable;
import java.io.File;
Expand All @@ -35,15 +38,18 @@
public class ParquetInputFormat extends NestedInputFormat
{
private final boolean binaryAsString;
private final Configuration conf;

@JsonCreator
public ParquetInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
@JacksonInject @Parquet Configuration conf
)
{
super(flattenSpec);
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
this.conf = conf;
}

@JsonProperty
Expand All @@ -65,7 +71,7 @@ public InputEntityReader createReader(
File temporaryDirectory
) throws IOException
{
return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
Expand All @@ -51,6 +52,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
private final Closer closer;

ParquetReader(
Configuration conf,
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
Expand All @@ -69,7 +71,9 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path)
.withConf(conf)
.build());
}
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
Expand Down
Loading