From 25d8de088762b98f58a39d48f8e38b94d98d20ad Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 1 Feb 2021 17:59:32 -0800 Subject: [PATCH 01/13] Allow only HTTP and HTTPS protocols for the HTTP inputSource --- .../data/input/impl/HttpInputSource.java | 8 +++++ .../data/input/impl/HttpInputSourceTest.java | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 36d6b97a40e1..d6eff733c3e5 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -59,6 +59,7 @@ public HttpInputSource( ) { Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs"); + throwForInvalidProtocols(uris); uris.forEach(uri -> Preconditions.checkArgument( config.isURIAllowed(uri), StringUtils.format("Access to [%s] DENIED!", uri) @@ -69,6 +70,13 @@ public HttpInputSource( this.config = config; } + private static void throwForInvalidProtocols(List uris) + { + if (uris.stream().anyMatch(uri -> !"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme()))) { + throw new IllegalArgumentException("Only HTTP or HTTPS are allowed"); + } + } + @JsonProperty public List getUris() { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 892fb38b1d81..a279a7c18043 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -25,7 +25,9 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; @@ -33,6 +35,9 @@ public class HttpInputSourceTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testSerde() throws IOException { @@ -78,6 +83,33 @@ public void testDenyListDomainNoMatch() ); } + @Test + public void testConstructorAllowsOnlyHttpAndHttpsProtocols() + { + new HttpInputSource( + ImmutableList.of(URI.create("http:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null, null) + ); + + new HttpInputSource( + ImmutableList.of(URI.create("https:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null, null) + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only HTTP or HTTPS are allowed"); + new HttpInputSource( + ImmutableList.of(URI.create("my-protocol:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null, null) + ); + } + @Test(expected = IllegalArgumentException.class) public void testAllowListDomainThrowsException() { From 87a586a24ef55857d08273af41dadc65c37c0ef2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 1 Feb 2021 18:02:55 -0800 Subject: [PATCH 02/13] rename --- .../org/apache/druid/data/input/impl/HttpInputSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index d6eff733c3e5..3fec11575230 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -59,7 +59,7 @@ public HttpInputSource( ) { Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs"); - throwForInvalidProtocols(uris); + throwIfInvalidProtocols(uris); uris.forEach(uri -> Preconditions.checkArgument( config.isURIAllowed(uri), StringUtils.format("Access to [%s] DENIED!", uri) @@ -70,7 +70,7 @@ public HttpInputSource( this.config = config; } - private static void throwForInvalidProtocols(List uris) + private static void throwIfInvalidProtocols(List uris) { if (uris.stream().anyMatch(uri -> !"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme()))) { throw new IllegalArgumentException("Only HTTP or HTTPS are allowed"); From e4e0ced457b4d838057d2ce9ceebb1c33dd8c920 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 2 Feb 2021 01:33:35 -0800 Subject: [PATCH 03/13] Update core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- .../java/org/apache/druid/data/input/impl/HttpInputSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 3fec11575230..85ab576f7308 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -72,7 +72,7 @@ public HttpInputSource( private static void throwIfInvalidProtocols(List uris) { - if (uris.stream().anyMatch(uri -> !"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme()))) { + if (uris.stream().anyMatch(uri -> !"http".equalsIgnoreCase(uri.getScheme()) && !"https".equalsIgnoreCase(uri.getScheme()))) { throw new IllegalArgumentException("Only HTTP or HTTPS are allowed"); } } From 6039aff6b31f337969ac40ce1dae00a0af22cbf8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 2 Feb 2021 10:34:36 -0800 Subject: [PATCH 04/13] fix http firehose and update doc --- .../data/input/impl/HttpInputSource.java | 2 +- docs/ingestion/native-batch.md | 3 +- .../firehose/HttpFirehoseFactory.java | 2 + .../firehose/HttpFirehoseFactoryTest.java | 46 +++++++++++++++++++ 4 files changed, 51 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 3fec11575230..cbfe5a6c6c6c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -70,7 +70,7 @@ public HttpInputSource( this.config = config; } - private static void throwIfInvalidProtocols(List uris) + public static void throwIfInvalidProtocols(List uris) { if (uris.stream().anyMatch(uri -> !"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme()))) { throw new IllegalArgumentException("Only HTTP or HTTPS are allowed"); diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 6e5620d4cc8d..29bb2d1e69b7 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1203,7 +1203,7 @@ You can also use the other existing Druid PasswordProviders. Here is an example |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `http`|None|yes| -|uris|URIs of the input files.|None|yes| +|uris|URIs of the input files. Only `http` and `https` schemes are allowed.|None|yes| |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| @@ -1590,6 +1590,7 @@ A sample HTTP Firehose spec is shown below: } ``` +URIs must have a scheme of either `http` or `https`. The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header. diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 7ff5e957d66a..7aca7631f5d1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.HttpEntity; +import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.java.util.common.logger.Logger; @@ -64,6 +65,7 @@ public HttpFirehoseFactory( { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); + HttpInputSource.throwIfInvalidProtocols(uris); this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java index e44ff7f6391c..dec069ce0e5e 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java @@ -21,16 +21,23 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.impl.HttpInputSource; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; public class HttpFirehoseFactoryTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testSerde() throws IOException { @@ -54,4 +61,43 @@ public void testSerde() throws IOException Assert.assertEquals(factory, outputFact); } + + @Test + public void testConstructorAllowsOnlyHttpAndHttpsProtocols() + { + new HttpFirehoseFactory( + ImmutableList.of(URI.create("http:///")), + null, + null, + null, + null, + null, + null, + null + ); + + new HttpFirehoseFactory( + ImmutableList.of(URI.create("https:///")), + null, + null, + null, + null, + null, + null, + null + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only HTTP or HTTPS are allowed"); + new HttpFirehoseFactory( + ImmutableList.of(URI.create("my-protocol:///")), + null, + null, + null, + null, + null, + null, + null + ); + } } From 9b3973788bef5413656305d0d071a449984218aa Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 2 Feb 2021 19:55:55 -0800 Subject: [PATCH 05/13] HDFS inputSource --- .../druid/inputsource/hdfs/HdfsInputSource.java | 12 +++++++++--- .../inputsource/hdfs/HdfsInputSourceTest.java | 15 +++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index b8c798a77b5c..65d374f6d081 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -51,6 +51,7 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -101,9 +102,9 @@ public static List coerceInputPathsToList(Object inputPaths, String prop return paths; } - public static Collection getPaths(List inputPaths, Configuration configuration) throws IOException + public static Collection getPaths(List inputPathStrings, Configuration configuration) throws IOException { - if (inputPaths.isEmpty()) { + if (inputPathStrings.isEmpty()) { return Collections.emptySet(); } @@ -111,10 +112,15 @@ public static Collection getPaths(List inputPaths, Configuration c Job job = Job.getInstance(configuration); // Add paths to the fake JobContext. - for (String inputPath : inputPaths) { + for (String inputPath : inputPathStrings) { FileInputFormat.addInputPaths(job, inputPath); } + final Path[] inputPaths = FileInputFormat.getInputPaths(job); + if (Arrays.stream(inputPaths).anyMatch(path -> !"hdfs".equalsIgnoreCase(path.toUri().getScheme()))) { + throw new IllegalArgumentException("Input paths must be the HDFS path"); + } + return new HdfsFileInputFormat().getSplits(job) .stream() .filter(split -> ((FileSplit) split).getLength() > 0) diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 170791e7d16c..fa0f49867b6a 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -169,6 +169,9 @@ public static class ReaderTest private static final String KEY_VALUE_SEPARATOR = ","; private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -292,6 +295,18 @@ public void createCorrectInputSourceWithSplit() throws Exception Assert.assertEquals(expectedPath, actualPath); } } + + @Test + public void testNonHdfsPathThrowException() + { + target = HdfsInputSource.builder() + .paths("file:///" + PATH + "*") + .configuration(CONFIGURATION) + .build(); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Input paths must be the HDFS path"); + target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); + } } public static class EmptyPathsTest From 661e6f7d89855756d54ec95536265b68cda2de39 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 5 Feb 2021 15:11:21 -0800 Subject: [PATCH 06/13] add configs for allowed protocols --- .../data/input/impl/HttpInputSource.java | 23 +-- .../input/impl/HttpInputSourceConfig.java | 62 +++++--- .../input/impl/HttpInputSourceConfigTest.java | 56 +++++++ .../data/input/impl/HttpInputSourceTest.java | 49 ++++-- docs/configuration/index.md | 38 ++++- docs/ingestion/native-batch.md | 29 ++-- .../firehose/hdfs/HdfsFirehoseFactory.java | 26 ++-- .../inputsource/hdfs/HdfsInputSource.java | 63 +++++--- .../hdfs/HdfsInputSourceConfig.java | 52 +++++++ .../storage/hdfs/HdfsStorageDruidModule.java | 3 + .../hdfs/HdfsFirehoseFactoryTest.java | 147 +++++++++++++++++- .../hdfs/HdfsInputSourceConfigTest.java | 48 ++++++ .../inputsource/hdfs/HdfsInputSourceTest.java | 106 ++++++++++--- .../hdfs/HdfsStorageDruidModuleTest.java | 80 ++++++++++ .../firehose/HttpFirehoseFactory.java | 43 ++--- .../metadata/input/InputSourceModuleTest.java | 6 +- .../firehose/HttpFirehoseFactoryTest.java | 55 ++++++- 17 files changed, 733 insertions(+), 153 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java create mode 100644 extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java create mode 100644 extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java create mode 100644 extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 096c4d65e478..f6a0a2cb02a0 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PasswordProvider; @@ -47,7 +48,6 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn private final String httpAuthenticationUsername; @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; - private final HttpInputSourceConfig config; @JsonCreator @@ -59,7 +59,7 @@ public HttpInputSource( ) { Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs"); - throwIfInvalidProtocols(uris); + throwIfInvalidProtocols(config, uris); uris.forEach(uri -> Preconditions.checkArgument( config.isURIAllowed(uri), StringUtils.format("Access to [%s] DENIED!", uri) @@ -70,10 +70,12 @@ public HttpInputSource( this.config = config; } - public static void throwIfInvalidProtocols(List uris) + public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List uris) { - if (uris.stream().anyMatch(uri -> !"http".equalsIgnoreCase(uri.getScheme()) && !"https".equalsIgnoreCase(uri.getScheme()))) { - throw new IllegalArgumentException("Only HTTP or HTTPS are allowed"); + for (URI uri : uris) { + if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(uri.getScheme()))) { + throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols()); + } } } @@ -148,16 +150,17 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - HttpInputSource source = (HttpInputSource) o; - return Objects.equals(uris, source.uris) && - Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) && - Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider); + HttpInputSource that = (HttpInputSource) o; + return Objects.equals(uris, that.uris) && + Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && + Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && + Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider); + return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java index a84fb31e439b..b602520597c9 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java @@ -21,43 +21,67 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; import java.net.URI; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; public class HttpInputSourceConfig { + @VisibleForTesting + static final Set DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("http", "https"); + + @Nullable @JsonProperty private final List allowListDomains; + @Nullable @JsonProperty private final List denyListDomains; + @JsonProperty + private final Set allowedProtocols; @JsonCreator public HttpInputSourceConfig( - @JsonProperty("allowListDomains") List allowListDomains, - @JsonProperty("denyListDomains") List denyListDomains + @JsonProperty("allowListDomains") @Nullable List allowListDomains, + @JsonProperty("denyListDomains") @Nullable List denyListDomains, + @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols ) { - this.allowListDomains = allowListDomains; - this.denyListDomains = denyListDomains; Preconditions.checkArgument( - this.denyListDomains == null || this.allowListDomains == null, + denyListDomains == null || allowListDomains == null, "Can only use one of allowList or blackList" ); + this.allowListDomains = allowListDomains; + this.denyListDomains = denyListDomains; + this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty() + ? DEFAULT_ALLOWED_PROTOCOLS + : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); } + @Nullable public List getAllowListDomains() { return allowListDomains; } + @Nullable public List getDenyListDomains() { return denyListDomains; } + public Set getAllowedProtocols() + { + return allowedProtocols; + } + private static boolean matchesAny(List domains, URI uri) { for (String domain : domains) { @@ -80,15 +104,6 @@ public boolean isURIAllowed(URI uri) return true; } - @Override - public String toString() - { - return "HttpInputSourceConfig{" + - "allowListDomains=" + allowListDomains + - ", denyListDomains=" + denyListDomains + - '}'; - } - @Override public boolean equals(Object o) { @@ -98,15 +113,26 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - HttpInputSourceConfig config = (HttpInputSourceConfig) o; - return Objects.equals(allowListDomains, config.allowListDomains) && - Objects.equals(denyListDomains, config.denyListDomains); + HttpInputSourceConfig that = (HttpInputSourceConfig) o; + return Objects.equals(allowListDomains, that.allowListDomains) && + Objects.equals(denyListDomains, that.denyListDomains) && + Objects.equals(allowedProtocols, that.allowedProtocols); } @Override public int hashCode() { - return Objects.hash(allowListDomains, denyListDomains); + return Objects.hash(allowListDomains, denyListDomains, allowedProtocols); + } + + @Override + public String toString() + { + return "HttpInputSourceConfig{" + + "allowListDomains=" + allowListDomains + + ", denyListDomains=" + denyListDomains + + ", allowedProtocols=" + allowedProtocols + + '}'; } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java new file mode 100644 index 000000000000..c8dd15c4e483 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Assert; +import org.junit.Test; + +public class HttpInputSourceConfigTest +{ + + @Test + public void testEquals() + { + EqualsVerifier.forClass(HttpInputSourceConfig.class).usingGetClass().verify(); + } + + @Test + public void testNullAllowedProtocolsUseDefault() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(null, null, null); + Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testEmptyAllowedProtocolsUseDefault() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(null, null, ImmutableSet.of()); + Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testCustomAllowedProtocols() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(null, null, ImmutableSet.of("druid")); + Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index a279a7c18043..c2db5c957a70 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.InputSource; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; @@ -41,10 +42,7 @@ public class HttpInputSourceTest @Test public void testSerde() throws IOException { - HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig( - null, - null - ); + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null, null, null); final ObjectMapper mapper = new ObjectMapper(); mapper.setInjectableValues(new InjectableValues.Std().addValue( HttpInputSourceConfig.class, @@ -68,7 +66,7 @@ public void testDenyListDomainThrowsException() ImmutableList.of(URI.create("http://deny.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(null, Collections.singletonList("deny.com")) + new HttpInputSourceConfig(null, Collections.singletonList("deny.com"), null) ); } @@ -79,34 +77,55 @@ public void testDenyListDomainNoMatch() ImmutableList.of(URI.create("http://allow.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(null, Collections.singletonList("deny.com")) + new HttpInputSourceConfig(null, Collections.singletonList("deny.com"), null) ); } @Test - public void testConstructorAllowsOnlyHttpAndHttpsProtocols() + public void testConstructorAllowsOnlyDefaultProtocols() { new HttpInputSource( ImmutableList.of(URI.create("http:///")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(null, null) + new HttpInputSourceConfig(null, null, null) ); new HttpInputSource( ImmutableList.of(URI.create("https:///")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(null, null) + new HttpInputSourceConfig(null, null, null) ); expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Only HTTP or HTTPS are allowed"); + expectedException.expectMessage("Only [http, https] protocols are allowed"); new HttpInputSource( ImmutableList.of(URI.create("my-protocol:///")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(null, null) + new HttpInputSourceConfig(null, null, null) + ); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocols() + { + final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(null, null, ImmutableSet.of("druid")); + new HttpInputSource( + ImmutableList.of(URI.create("druid:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + customConfig + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + new HttpInputSource( + ImmutableList.of(URI.create("https:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + customConfig ); } @@ -117,7 +136,7 @@ public void testAllowListDomainThrowsException() ImmutableList.of(URI.create("http://deny.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(Collections.singletonList("allow.com"), null) + new HttpInputSourceConfig(Collections.singletonList("allow.com"), null, null) ); } @@ -128,7 +147,7 @@ public void testAllowListDomainMatch() ImmutableList.of(URI.create("http://allow.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(Collections.singletonList("allow.com"), null) + new HttpInputSourceConfig(Collections.singletonList("allow.com"), null, null) ); } @@ -139,13 +158,13 @@ public void testEmptyAllowListDomainMatch() ImmutableList.of(URI.create("http://allow.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), - new HttpInputSourceConfig(Collections.emptyList(), null) + new HttpInputSourceConfig(Collections.emptyList(), null, null) ); } @Test(expected = IllegalArgumentException.class) public void testCannotSetBothAllowAndDenyList() { - new HttpInputSourceConfig(Collections.singletonList("allow.com"), Collections.singletonList("deny.com")); + new HttpInputSourceConfig(Collections.singletonList("allow.com"), Collections.singletonList("deny.com"), null); } } diff --git a/docs/configuration/index.md b/docs/configuration/index.md index fbad3865bfb5..b8ef554d9120 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -515,6 +515,36 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas |`druid.storage.keyspace`|Cassandra key space.|none| +### Ingestion Security Configuration + +#### HDFS input source + +You can set the following property to control what protocols are allowed for +the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose). + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols that HDFS inputSource and HDFS firehose can use.|["hdfs"]| + + +#### HTTP input source + +You can set the following property to control what protocols are allowed for +the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose). + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols that HTTP inputSource and HTTP firehose can use.|["http", "https"]| + +The following properties are to control what domains native batch tasks can access to using +the [HTTP input source](../ingestion/native-batch.md#http-input-source). + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.http.allowListDomains`|List of domains|Allowed domains from which ingestion will be allowed. Only one of allowList or denyList can be set.|empty list| +|`druid.ingestion.http.denyListDomains`|List of domains|Blacklisted domains from which ingestion will NOT be allowed. Only one of allowList or denyList can be set. |empty list| + + ### Task Logging If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS. @@ -1355,14 +1385,6 @@ The amount of direct memory needed by Druid is at least ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command line. -#### Indexer Security Configuration -You can optionally configure following additional configs to restrict druid ingestion - -|Property|Possible Values|Description|Default| -|--------|---------------|-----------|-------| -|`druid.ingestion.http.allowListDomains`|List of domains|Allowed domains from which ingestion will be allowed. Only one of allowList or denyList can be set.|empty list| -|`druid.ingestion.http.denyListDomains`|List of domains|Blacklisted domains from which ingestion will NOT be allowed. Only one of allowList or denyList can be set. |empty list| - #### Query Configurations diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 29bb2d1e69b7..4999eeb776ba 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1064,7 +1064,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs://foo/bar/", "hdfs://bar/foo" + "paths": "hdfs:/foo/bar/", "hdfs:/bar/foo" }, "inputFormat": { "type": "json" @@ -1080,7 +1080,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs://foo/bar", "hdfs://bar/foo"] + "paths": ["hdfs:/foo/bar", "hdfs:/bar/foo"] }, "inputFormat": { "type": "json" @@ -1096,7 +1096,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json" + "paths": "hdfs:/foo/bar/file.json", "hdfs:/bar/foo/file2.json" }, "inputFormat": { "type": "json" @@ -1112,7 +1112,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"] + "paths": ["hdfs:/foo/bar/file.json", "hdfs:/bar/foo/file2.json"] }, "inputFormat": { "type": "json" @@ -1127,9 +1127,10 @@ Sample specs: |type|This should be `hdfs`.|None|yes| |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| -You can also ingest from cloud storage using the HDFS input source. -However, if you want to read from AWS S3 or Google Cloud Storage, consider using -the [S3 input source](#s3-input-source) or the [Google Cloud Storage input source](#google-cloud-storage-input-source) instead. +You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. +However, if you want to ingest from cloud storage, consider using the proper input sources for them. +If you want to use a non-hdfs protocol with the HDFS input source, you need to include the protocol you want +in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details. ### HTTP Input Source @@ -1203,10 +1204,13 @@ You can also use the other existing Druid PasswordProviders. Here is an example |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `http`|None|yes| -|uris|URIs of the input files. Only `http` and `https` schemes are allowed.|None|yes| +|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes| |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| +The protocols that the HTTP input source can use is restricted by `druid.ingestion.http.allowedProtocols`. +The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details. + ### Inline Input Source The Inline input source can be used to read the data inlined in its own spec. @@ -1553,6 +1557,11 @@ Note that prefetching or caching isn't that useful in the Parallel task. |fetchTimeout|Timeout for fetching each file.|60000| |maxFetchRetry|Maximum number of retries for fetching each file.|3| +You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage. +However, if you want to ingest from cloud storage, consider using the proper input sources for them. +If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want +in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details. + ### LocalFirehose This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers. @@ -1590,7 +1599,9 @@ A sample HTTP Firehose spec is shown below: } ``` -URIs must have a scheme of either `http` or `https`. +The protocols that the HTTP firehose can use is restricted by `druid.ingestion.http.allowedProtocols`. +The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details. + The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header. diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index ee5bf032b4d0..9a3312a8903f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.guice.Hdfs; import org.apache.druid.inputsource.hdfs.HdfsInputSource; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; @@ -44,21 +45,25 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory inputPaths; private final Configuration conf; + private final HdfsInputSourceConfig inputSourceConfig; @JsonCreator public HdfsFirehoseFactory( - @JacksonInject @Hdfs Configuration conf, @JsonProperty("paths") Object inputPaths, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, @JsonProperty("fetchTimeout") Long fetchTimeout, - @JsonProperty("maxFetchRetry") Integer maxFetchRetry - ) + @JsonProperty("maxFetchRetry") Integer maxFetchRetry, + @JacksonInject @Hdfs Configuration conf, + @JacksonInject HdfsInputSourceConfig inputSourceConfig + ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); - this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths"); + this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths"); this.conf = conf; + this.inputSourceConfig = inputSourceConfig; + this.inputPaths.forEach(p -> HdfsInputSource.verifyProtocol(conf, inputSourceConfig, p)); } @JsonProperty("paths") @@ -109,21 +114,14 @@ public boolean isSplittable() public FiniteFirehoseFactory withSplit(InputSplit split) { return new HdfsFirehoseFactory( - conf, split.get().toString(), getMaxCacheCapacityBytes(), getMaxFetchCapacityBytes(), getPrefetchTriggerBytes(), getFetchTimeout(), - getMaxFetchRetry() + getMaxFetchRetry(), + conf, + inputSourceConfig ); } - - @Override - public String toString() - { - return "HdfsFirehoseFactory{" + - "inputPaths=" + inputPaths + - '}'; - } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 65d374f6d081..911cd09cbec2 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.utils.Streams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -51,7 +52,6 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -65,6 +65,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn private final List inputPaths; private final Configuration configuration; + private final HdfsInputSourceConfig inputSourceConfig; // Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource // *does* cache the splits for the following reasons: @@ -74,37 +75,54 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn // // 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone // migrating from Hadoop. - private List cachedPaths; + @Nullable + private List cachedPaths = null; @JsonCreator public HdfsInputSource( @JsonProperty(PROP_PATHS) Object inputPaths, - @JacksonInject @Hdfs Configuration configuration + @JacksonInject @Hdfs Configuration configuration, + @JacksonInject HdfsInputSourceConfig inputSourceConfig ) { this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); this.configuration = configuration; - this.cachedPaths = null; + this.inputSourceConfig = inputSourceConfig; + this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p)); } public static List coerceInputPathsToList(Object inputPaths, String propertyName) { - final List paths; - if (inputPaths instanceof String) { - paths = Collections.singletonList((String) inputPaths); + return Collections.singletonList((String) inputPaths); } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { - paths = ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); + return ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); } else { throw new IAE("'%s' must be a string or an array of strings", propertyName); } + } + + public static void verifyProtocol(Configuration conf, HdfsInputSourceConfig config, String pathString) + { + Path path = new Path(pathString); + try { + throwIfInvalidProtocol(config, path.getFileSystem(conf).getScheme()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } - return paths; + private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String scheme) + { + if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(scheme))) { + throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols()); + } } - public static Collection getPaths(List inputPathStrings, Configuration configuration) throws IOException + public static Collection getPaths(List inputPaths, Configuration configuration) throws IOException { - if (inputPathStrings.isEmpty()) { + if (inputPaths.isEmpty()) { return Collections.emptySet(); } @@ -112,15 +130,10 @@ public static Collection getPaths(List inputPathStrings, Configura Job job = Job.getInstance(configuration); // Add paths to the fake JobContext. - for (String inputPath : inputPathStrings) { + for (String inputPath : inputPaths) { FileInputFormat.addInputPaths(job, inputPath); } - final Path[] inputPaths = FileInputFormat.getInputPaths(job); - if (Arrays.stream(inputPaths).anyMatch(path -> !"hdfs".equalsIgnoreCase(path.toUri().getScheme()))) { - throw new IllegalArgumentException("Input paths must be the HDFS path"); - } - return new HdfsFileInputFormat().getSplits(job) .stream() .filter(split -> ((FileSplit) split).getLength() > 0) @@ -208,7 +221,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp public SplittableInputSource> withSplit(InputSplit> split) { List paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); - return new HdfsInputSource(paths, configuration); + return new HdfsInputSource(paths, configuration, inputSourceConfig); } @Override @@ -224,6 +237,7 @@ private void cachePathsIfNeeded() throws IOException } } + @VisibleForTesting static Builder builder() { return new Builder(); @@ -233,6 +247,7 @@ static final class Builder { private Object paths; private Configuration configuration; + private HdfsInputSourceConfig inputSourceConfig; private Builder() { @@ -250,9 +265,19 @@ Builder configuration(Configuration configuration) return this; } + Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig) + { + this.inputSourceConfig = inputSourceConfig; + return this; + } + HdfsInputSource build() { - return new HdfsInputSource(paths, configuration); + return new HdfsInputSource( + Preconditions.checkNotNull(paths, "paths"), + Preconditions.checkNotNull(configuration, "configuration"), + Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig") + ); } } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java new file mode 100644 index 000000000000..c7f43f8b6a99 --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.util.Set; +import java.util.stream.Collectors; + +public class HdfsInputSourceConfig +{ + static final Set DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("hdfs"); + + @JsonProperty + private final Set allowedProtocols; + + @JsonCreator + public HdfsInputSourceConfig( + @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols + ) + { + this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty() + ? DEFAULT_ALLOWED_PROTOCOLS + : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); + } + + public Set getAllowedProtocols() + { + return allowedProtocols; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index e89bb0d8000e..3ca8e23535e1 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -35,6 +35,7 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; import org.apache.druid.inputsource.hdfs.HdfsInputSource; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.hadoop.conf.Configuration; @@ -118,5 +119,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class); binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class); LifecycleModule.register(binder, HdfsStorageAuthentication.class); + + JsonConfigProvider.bind(binder, "druid.ingestion.hdfs", HdfsInputSourceConfig.class); } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java index 88daed7a821d..e96a773c0cef 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java @@ -19,30 +19,48 @@ package org.apache.druid.firehose.hdfs; -import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.Collections; public class HdfsFirehoseFactoryTest { + private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null); + private static final Configuration DEFAULT_CONFIGURATION = new Configuration(); + + @BeforeClass + public static void setup() + { + DEFAULT_CONFIGURATION.set("fs.default.name", "hdfs://localhost:7020"); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testArrayPaths() throws IOException { final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( - null, Collections.singletonList("/foo/bar"), null, null, null, null, - null + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG ); final ObjectMapper mapper = createMapper(); @@ -59,7 +77,16 @@ public void testArrayPaths() throws IOException @Test public void testStringPaths() throws IOException { - final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null); + final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( + "/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); final ObjectMapper mapper = createMapper(); final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory) @@ -71,11 +98,121 @@ public void testStringPaths() throws IOException ); } + @Test + public void testConstructorAllowsOnlyDefaultProtocol() + { + new HdfsFirehoseFactory( + "hdfs://localhost:7020/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [hdfs] protocols are allowed"); + new HdfsFirehoseFactory( + "file:/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocol() + { + final Configuration conf = new Configuration(); + conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem"); + new HdfsFirehoseFactory( + "ftp://localhost:21/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + new HdfsInputSourceConfig(ImmutableSet.of("ftp")) + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + new HdfsFirehoseFactory( + "hdfs://localhost:7020/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + new HdfsInputSourceConfig(ImmutableSet.of("druid")) + ); + } + + @Test + public void testConstructorWithDefaultHdfs() + { + new HdfsFirehoseFactory( + "/foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "hdfs:///foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "hdfs://localhost:10020/foo/bar*", // different hdfs + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + } + private static ObjectMapper createMapper() { final ObjectMapper mapper = new ObjectMapper(); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); - mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + mapper.setInjectableValues( + new Std() + .addValue(Configuration.class, DEFAULT_CONFIGURATION) + .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG) + ); return mapper; } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java new file mode 100644 index 000000000000..2e73688cd18f --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.google.common.collect.ImmutableSet; +import org.junit.Assert; +import org.junit.Test; + +public class HdfsInputSourceConfigTest +{ + @Test + public void testNullAllowedProtocolsUseDefault() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(null); + Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testEmptyAllowedProtocolsUseDefault() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of()); + Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testCustomAllowedProtocols() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of("druid")); + Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + } +} diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index fa0f49867b6a..a61a0c6de950 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -20,8 +20,9 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; @@ -68,8 +69,9 @@ @RunWith(Enclosed.class) public class HdfsInputSourceTest extends InitializedNullHandlingTest { - private static final String PATH = "/foo/bar"; + private static final String PATH = "hdfs://localhost:7020/foo/bar"; private static final Configuration CONFIGURATION = new Configuration(); + private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null); private static final String COLUMN = "value"; private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( new TimestampSpec(null, null, null), @@ -84,6 +86,80 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest 0 ); + public static class ConstructorTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testConstructorAllowsOnlyDefaultProtocol() + { + HdfsInputSource.builder() + .paths(PATH + "*") + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [hdfs] protocols are allowed"); + HdfsInputSource.builder() + .paths("file:/foo/bar*") + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocol() + { + final Configuration conf = new Configuration(); + conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem"); + HdfsInputSource.builder() + .paths("ftp://localhost:21/foo/bar") + .configuration(CONFIGURATION) + .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("ftp"))) + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + HdfsInputSource.builder() + .paths(PATH + "*") + .configuration(CONFIGURATION) + .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("druid"))) + .build(); + } + + @Test + public void testConstructorWithDefaultHdfs() + { + final Configuration conf = new Configuration(); + conf.set("fs.default.name", "hdfs://localhost:7020"); + HdfsInputSource.builder() + .paths("/foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("hdfs:///foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("hdfs://localhost:10020/foo/bar*") // different hdfs + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + } + } + public static class SerializeDeserializeTest { private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); @@ -98,7 +174,8 @@ public void setup() { hdfsInputSourceBuilder = HdfsInputSource.builder() .paths(PATH) - .configuration(CONFIGURATION); + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG); } @Test @@ -139,7 +216,11 @@ private static void testSerializesDeserializes(Wrapper hdfsInputSourceWrapper) private static ObjectMapper createObjectMapper() { final ObjectMapper mapper = new ObjectMapper(); - mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + mapper.setInjectableValues( + new Std() + .addValue(Configuration.class, new Configuration()) + .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG) + ); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); return mapper; } @@ -169,9 +250,6 @@ public static class ReaderTest private static final String KEY_VALUE_SEPARATOR = ","; private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -207,6 +285,7 @@ public void setup() throws IOException target = HdfsInputSource.builder() .paths(dfsCluster.getURI() + PATH + "*") .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) .build(); } @@ -295,18 +374,6 @@ public void createCorrectInputSourceWithSplit() throws Exception Assert.assertEquals(expectedPath, actualPath); } } - - @Test - public void testNonHdfsPathThrowException() - { - target = HdfsInputSource.builder() - .paths("file:///" + PATH + "*") - .configuration(CONFIGURATION) - .build(); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Input paths must be the HDFS path"); - target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); - } } public static class EmptyPathsTest @@ -319,6 +386,7 @@ public void setup() target = HdfsInputSource.builder() .paths(Collections.emptyList()) .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) .build(); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java new file mode 100644 index 000000000000..31badedc7e66 --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.hdfs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.JsonConfigurator; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; +import org.junit.Assert; +import org.junit.Test; + +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Properties; + +public class HdfsStorageDruidModuleTest +{ + @Test + public void testHdfsInputSourceConfigDefaultAllowedProtocols() + { + Properties props = new Properties(); + Injector injector = makeInjectorWithProperties(props); + HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class); + Assert.assertEquals( + ImmutableSet.of("hdfs"), + instance.getAllowedProtocols() + ); + } + + @Test + public void testHdfsInputSourceConfigCustomAllowedProtocols() + { + Properties props = new Properties(); + props.setProperty("druid.ingestion.hdfs.allowedProtocols", "[\"webhdfs\"]"); + Injector injector = makeInjectorWithProperties(props); + HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class); + Assert.assertEquals( + ImmutableSet.of("webhdfs"), + instance.getAllowedProtocols() + ); + } + + private Injector makeInjectorWithProperties(final Properties props) + { + return Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new LifecycleModule(), + binder -> { + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bind(JsonConfigurator.class).in(LazySingleton.class); + binder.bind(Properties.class).toInstance(props); + }, + new HdfsStorageDruidModule() + ) + ); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 7aca7631f5d1..30f89b745087 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.realtime.firehose; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -27,9 +28,9 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.HttpEntity; import org.apache.druid.data.input.impl.HttpInputSource; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.utils.CompressionUtils; @@ -44,12 +45,12 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { - private static final Logger log = new Logger(HttpFirehoseFactory.class); private final List uris; @Nullable private final String httpAuthenticationUsername; @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; + private final HttpInputSourceConfig inputSourceConfig; @JsonCreator public HttpFirehoseFactory( @@ -60,15 +61,17 @@ public HttpFirehoseFactory( @JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("maxFetchRetry") Integer maxFetchRetry, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, - @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider - ) + @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, + @JacksonInject HttpInputSourceConfig inputSourceConfig + ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); - HttpInputSource.throwIfInvalidProtocols(uris); + HttpInputSource.throwIfInvalidProtocols(inputSourceConfig, uris); this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; + this.inputSourceConfig = inputSourceConfig; } @Nullable @@ -122,35 +125,20 @@ public boolean equals(Object o) if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - final HttpFirehoseFactory that = (HttpFirehoseFactory) o; - return Objects.equals(uris, that.uris) && - getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && - getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && - getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && - getFetchTimeout() == that.getFetchTimeout() && - getMaxFetchRetry() == that.getMaxFetchRetry() && - Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) && - Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider()); + HttpFirehoseFactory that = (HttpFirehoseFactory) o; + return uris.equals(that.uris) && + Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && + Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && + inputSourceConfig.equals(that.inputSourceConfig); } @Override public int hashCode() { - return Objects.hash( - uris, - getMaxCacheCapacityBytes(), - getMaxFetchCapacityBytes(), - getPrefetchTriggerBytes(), - getFetchTimeout(), - getMaxFetchRetry(), - httpAuthenticationUsername, - httpAuthenticationPasswordProvider - ); + return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, inputSourceConfig); } @Override @@ -170,7 +158,8 @@ public FiniteFirehoseFactory withSplit(InputSplit Date: Fri, 5 Feb 2021 17:43:36 -0800 Subject: [PATCH 07/13] fix checkstyle and doc --- docs/configuration/index.md | 4 ++-- .../druid/segment/realtime/firehose/HttpFirehoseFactory.java | 2 +- .../segment/realtime/firehose/HttpFirehoseFactoryTest.java | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b8ef554d9120..3351b3e01cce 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -524,7 +524,7 @@ the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols that HDFS inputSource and HDFS firehose can use.|["hdfs"]| +|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols that HDFS input source and HDFS firehose can use.|["hdfs"]| #### HTTP input source @@ -534,7 +534,7 @@ the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols that HTTP inputSource and HTTP firehose can use.|["http", "https"]| +|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols that HTTP input source and HTTP firehose can use.|["http", "https"]| The following properties are to control what domains native batch tasks can access to using the [HTTP input source](../ingestion/native-batch.md#http-input-source). diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 30f89b745087..bbd797f2d3d1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -63,7 +63,7 @@ public HttpFirehoseFactory( @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, @JacksonInject HttpInputSourceConfig inputSourceConfig - ) + ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java index 89d0252a17a3..5d8a44d8dccb 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.metadata.DefaultPasswordProvider; From a79f43134489486a2b9d935e9f4a59d93930ea47 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 5 Feb 2021 21:33:00 -0800 Subject: [PATCH 08/13] more checkstyle --- .../org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java | 2 +- .../java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index 9a3312a8903f..f7fac9f3b479 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -57,7 +57,7 @@ public HdfsFirehoseFactory( @JsonProperty("maxFetchRetry") Integer maxFetchRetry, @JacksonInject @Hdfs Configuration conf, @JacksonInject HdfsInputSourceConfig inputSourceConfig - ) + ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths"); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 911cd09cbec2..7faebccf268b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -96,7 +96,7 @@ public static List coerceInputPathsToList(Object inputPaths, String prop if (inputPaths instanceof String) { return Collections.singletonList((String) inputPaths); } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { - return ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); + return ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); } else { throw new IAE("'%s' must be a string or an array of strings", propertyName); } From d4b6cbd35155ac5a251a464001a42a2461c93467 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 11 Feb 2021 20:45:25 -0800 Subject: [PATCH 09/13] remove stale doc --- docs/configuration/index.md | 16 ++++++++-------- docs/ingestion/native-batch.md | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 3351b3e01cce..f7d828833b7a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -536,14 +536,6 @@ the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the |--------|---------------|-----------|-------| |`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols that HTTP input source and HTTP firehose can use.|["http", "https"]| -The following properties are to control what domains native batch tasks can access to using -the [HTTP input source](../ingestion/native-batch.md#http-input-source). - -|Property|Possible Values|Description|Default| -|--------|---------------|-----------|-------| -|`druid.ingestion.http.allowListDomains`|List of domains|Allowed domains from which ingestion will be allowed. Only one of allowList or denyList can be set.|empty list| -|`druid.ingestion.http.denyListDomains`|List of domains|Blacklisted domains from which ingestion will NOT be allowed. Only one of allowList or denyList can be set. |empty list| - ### Task Logging @@ -1385,6 +1377,14 @@ The amount of direct memory needed by Druid is at least ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command line. +#### Indexer Security Configuration +You can optionally configure following additional configs to restrict druid ingestion + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.http.allowListDomains`|List of domains|Allowed domains from which ingestion will be allowed. Only one of allowList or denyList can be set.|empty list| +|`druid.ingestion.http.denyListDomains`|List of domains|Blacklisted domains from which ingestion will NOT be allowed. Only one of allowList or denyList can be set. |empty list| + #### Query Configurations diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 7d003f436cf9..ebc44a93f031 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1128,7 +1128,7 @@ Sample specs: |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. -However, if you want to ingest from cloud storage, consider using the proper input sources for them. +However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. If you want to use a non-hdfs protocol with the HDFS input source, you need to include the protocol you want in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details. @@ -1558,7 +1558,7 @@ Note that prefetching or caching isn't that useful in the Parallel task. |maxFetchRetry|Maximum number of retries for fetching each file.|3| You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage. -However, if you want to ingest from cloud storage, consider using the proper input sources for them. +However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details. From f70f1ca045b6c5eaf718558c1662a8627a91adca Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 11 Feb 2021 20:46:41 -0800 Subject: [PATCH 10/13] remove more doc --- docs/configuration/index.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f7d828833b7a..51f678bcaac3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1377,14 +1377,6 @@ The amount of direct memory needed by Druid is at least ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command line. -#### Indexer Security Configuration -You can optionally configure following additional configs to restrict druid ingestion - -|Property|Possible Values|Description|Default| -|--------|---------------|-----------|-------| -|`druid.ingestion.http.allowListDomains`|List of domains|Allowed domains from which ingestion will be allowed. Only one of allowList or denyList can be set.|empty list| -|`druid.ingestion.http.denyListDomains`|List of domains|Blacklisted domains from which ingestion will NOT be allowed. Only one of allowList or denyList can be set. |empty list| - #### Query Configurations From 11ca3de5ea3fcda3281e4f008a085879c81a8865 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 11 Feb 2021 20:47:13 -0800 Subject: [PATCH 11/13] Apply doc suggestions from code review Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com> --- docs/configuration/index.md | 8 ++++---- docs/ingestion/native-batch.md | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f7d828833b7a..93962d19d0a7 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -519,22 +519,22 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas #### HDFS input source -You can set the following property to control what protocols are allowed for +You can set the following property to specify permissible protocols for the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose). |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols that HDFS input source and HDFS firehose can use.|["hdfs"]| +|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|["hdfs"]| #### HTTP input source -You can set the following property to control what protocols are allowed for +You can set the following property to specify permissible protocols for the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose). |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols that HTTP input source and HTTP firehose can use.|["http", "https"]| +|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]| ### Task Logging diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index ebc44a93f031..588ef41506f0 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1129,7 +1129,7 @@ Sample specs: You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. -If you want to use a non-hdfs protocol with the HDFS input source, you need to include the protocol you want +If you want to use a non-hdfs protocol with the HDFS input source, include the protocol in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details. ### HTTP Input Source @@ -1208,7 +1208,7 @@ You can also use the other existing Druid PasswordProviders. Here is an example |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| -The protocols that the HTTP input source can use is restricted by `druid.ingestion.http.allowedProtocols`. +You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources. The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details. ### Inline Input Source @@ -1599,7 +1599,7 @@ A sample HTTP Firehose spec is shown below: } ``` -The protocols that the HTTP firehose can use is restricted by `druid.ingestion.http.allowedProtocols`. +You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources. The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details. The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. From fe1b474d521c60a2a796b2975d0ac42315871ad7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 2 Mar 2021 22:30:30 -0800 Subject: [PATCH 12/13] update hdfs address in docs --- docs/ingestion/native-batch.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 588ef41506f0..7d020cd127a8 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1064,7 +1064,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs:/foo/bar/", "hdfs:/bar/foo" + "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo" }, "inputFormat": { "type": "json" @@ -1080,7 +1080,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs:/foo/bar", "hdfs:/bar/foo"] + "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo" }, "inputFormat": { "type": "json" @@ -1096,7 +1096,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs:/foo/bar/file.json", "hdfs:/bar/foo/file2.json" + "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json" }, "inputFormat": { "type": "json" @@ -1112,7 +1112,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs:/foo/bar/file.json", "hdfs:/bar/foo/file2.json"] + "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"] }, "inputFormat": { "type": "json" From c270ed66c408329ca2541555d650eca973145f81 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 4 Mar 2021 10:07:53 -0800 Subject: [PATCH 13/13] fix test --- .../org/apache/druid/data/input/impl/HttpInputSourceTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 984889020f7a..9c17b57d21eb 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.impl; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -42,6 +43,7 @@ public void testSerde() throws IOException { HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null); final ObjectMapper mapper = new ObjectMapper(); + mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig)); final HttpInputSource source = new HttpInputSource( ImmutableList.of(URI.create("http://test.com/http-test")), "myName",