Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ An endpoint that always returns a boolean "true" value with a 200 OK response, u

Returns the current configuration properties of the process.

* `/status/selfDiscoveredStatus`
* `/status/selfDiscovered/status`

Returns a JSON map of the form `{"selfDiscovered": true/false}`, indicating whether the node has received a confirmation
from the central node discovery mechanism (currently ZooKeeper) of the Druid cluster that the node has been added to the
Expand All @@ -60,7 +60,7 @@ nodes will be discovered by this node timely from this point.

* `/status/selfDiscovered`

Similar to `/status/selfDiscoveredStatus`, but returns 200 OK response with empty body if the node has discovered itself
Similar to `/status/selfDiscovered/status`, but returns 200 OK response with empty body if the node has discovered itself
and 503 SERVICE UNAVAILABLE if the node hasn't discovered itself yet. This endpoint might be useful because some
monitoring checks such as AWS load balancer health checks are not able to look at the response body.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
Expand All @@ -46,7 +47,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa

@JsonCreator
public HdfsFirehoseFactory(
@JacksonInject Configuration conf,
@JacksonInject @Hdfs Configuration conf,
@JsonProperty("paths") Object inputPaths,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
* different instances. This is a binding annotation for druid-hdfs-storage extension. Any binding for the same type
* should be distinguished by using this annotation.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Hdfs
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IAE;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
@JsonCreator
public HdfsInputSource(
@JsonProperty(PROP_PATHS) Object inputPaths,
@JacksonInject Configuration configuration
@JacksonInject @Hdfs Configuration configuration
)
{
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
Expand All @@ -43,7 +44,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
private final Path storageDirectory;

@Inject
public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
public HdfsDataSegmentKiller(@Hdfs final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
{
this.config = config;
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RetryUtils;
Expand Down Expand Up @@ -178,7 +179,7 @@ public boolean delete()
protected final Configuration config;

@Inject
public HdfsDataSegmentPuller(final Configuration config)
public HdfsDataSegmentPuller(@Hdfs final Configuration config)
{
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -60,7 +61,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final Supplier<String> fullyQualifiedStorageDirectory;

@Inject
public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper)
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
@Hdfs Configuration hadoopConfig,
ObjectMapper jsonMapper
)
{
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.inject.Inject;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -36,7 +37,7 @@
public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder<URI>
{
@Inject
public HdfsFileTimestampVersionFinder(Configuration config)
public HdfsFileTimestampVersionFinder(@Hdfs Configuration config)
{
super(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
Expand All @@ -40,7 +41,7 @@ public class HdfsStorageAuthentication
private final Configuration hadoopConf;

@Inject
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, Configuration hadoopConf)
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, @Hdfs Configuration hadoopConf)
{
this.hdfsKerberosConfig = hdfsKerberosConfig;
this.hadoopConf = hadoopConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);

Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
Expand All @@ -117,6 +118,5 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, HdfsStorageAuthentication.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.tasklogs.TaskLogs;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class HdfsTaskLogs implements TaskLogs
private final Configuration hadoopConfig;

@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig)
public HdfsTaskLogs(HdfsTaskLogsConfig config, @Hdfs Configuration hadoopConfig)
{
this.config = config;
this.hadoopConfig = hadoopConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.data.input.orc.guice.Orc;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -89,6 +90,6 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Orc.class).toInstance(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.orc.guice.Orc;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -42,7 +43,7 @@ public class OrcInputFormat extends NestedInputFormat
public OrcInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
@JacksonInject Configuration conf
@JacksonInject @Orc Configuration conf
)
{
super(flattenSpec);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.orc.guice;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
* different instances. This is a binding annotation for druid-orc-extensions extension. Any binding for the same type
* should be distinguished by using this annotation.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Orc
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
import org.apache.druid.data.input.parquet.guice.Parquet;
import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
import org.apache.druid.initialization.DruidModule;
Expand Down Expand Up @@ -98,6 +99,6 @@ public void configure(Binder binder)
}
}

binder.bind(Configuration.class).toInstance(conf);
binder.bind(Configuration.class).annotatedWith(Parquet.class).toInstance(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.apache.druid.data.input.parquet;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.parquet.guice.Parquet;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;

import javax.annotation.Nullable;
import java.io.File;
Expand All @@ -35,15 +38,18 @@
public class ParquetInputFormat extends NestedInputFormat
{
private final boolean binaryAsString;
private final Configuration conf;

@JsonCreator
public ParquetInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
@JacksonInject @Parquet Configuration conf
)
{
super(flattenSpec);
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
this.conf = conf;
}

@JsonProperty
Expand All @@ -65,7 +71,7 @@ public InputEntityReader createReader(
File temporaryDirectory
) throws IOException
{
return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
Expand All @@ -51,6 +52,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
private final Closer closer;

ParquetReader(
Configuration conf,
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
Expand All @@ -69,7 +71,9 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path)
.withConf(conf)
.build());
}
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
Expand Down
Loading