From 759eb4547dd9f4df3d042f941cea197316cfc711 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Wed, 28 Aug 2024 14:28:39 -0700 Subject: [PATCH 01/12] Additional headers in HttpInputSource in native and MSQ --- .../indexing/overlord/TaskQueueTest.java | 1 + .../druid/data/input/impl/HttpEntity.java | 4 +- .../data/input/impl/HttpInputSource.java | 44 ++++++++++++++----- .../data/input/impl/HttpInputSourceTest.java | 9 +++- .../impl/InputEntityIteratingReaderTest.java | 2 +- .../model/table/HttpInputSourceDefn.java | 40 ++++++++++++++++- .../model/table/ExternalTableTest.java | 2 + .../model/table/HttpInputSourceDefnTest.java | 9 +++- .../sql/calcite/IngestTableFunctionTest.java | 7 ++- 9 files changed, 102 insertions(+), 16 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index ee90a3335a17..85cd042efbe7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -562,6 +562,7 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException "user", new DefaultPasswordProvider(password), null, + null, httpInputSourceConfig), new NoopInputFormat(), null, diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index e03495ce02aa..333ad108704d 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -34,6 +34,7 @@ import java.net.URI; import java.net.URLConnection; import java.util.Base64; +import java.util.Map; public class HttpEntity extends RetryingInputEntity { @@ -48,7 +49,8 @@ public class HttpEntity extends RetryingInputEntity HttpEntity( URI uri, @Nullable String httpAuthenticationUsername, - @Nullable PasswordProvider httpAuthenticationPasswordProvider + @Nullable PasswordProvider httpAuthenticationPasswordProvider, + @Nullable Map additionalHeaders ) { this.uri = uri; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 0ef8194f1e13..dbfa76d6b516 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFormat; @@ -47,6 +48,7 @@ import java.net.URI; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Stream; @@ -64,6 +66,7 @@ public class HttpInputSource private final PasswordProvider httpAuthenticationPasswordProvider; private final SystemFields systemFields; private final HttpInputSourceConfig config; + private final Map headersMap; @JsonCreator public HttpInputSource( @@ -71,6 +74,7 @@ public HttpInputSource( @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, + @JsonProperty("additionalHeaders") @Nullable Map headersMap, @JacksonInject HttpInputSourceConfig config ) { @@ -80,17 +84,10 @@ public HttpInputSource( this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.systemFields = systemFields == null ? SystemFields.none() : systemFields; + this.headersMap = headersMap == null ? Maps.newHashMap() : headersMap; this.config = config; } - @JsonIgnore - @Nonnull - @Override - public Set getTypes() - { - return Collections.singleton(TYPE_KEY); - } - public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List uris) { for (URI uri : uris) { @@ -100,6 +97,14 @@ public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List getTypes() + { + return Collections.singleton(TYPE_KEY); + } + @JsonProperty public List getUris() { @@ -128,6 +133,14 @@ public PasswordProvider getHttpAuthenticationPasswordProvider() return httpAuthenticationPasswordProvider; } + @Nullable + @JsonProperty("additionalHeaders") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getAdditionalHeaders() + { + return headersMap; + } + @Override public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { @@ -148,6 +161,7 @@ public SplittableInputSource withSplit(InputSplit split) httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, + headersMap, config ); } @@ -181,7 +195,8 @@ protected InputSourceReader formattableReader( createSplits(inputFormat, null).map(split -> new HttpEntity( split.get(), httpAuthenticationUsername, - httpAuthenticationPasswordProvider + httpAuthenticationPasswordProvider, + headersMap )).iterator() ), SystemFieldDecoratorFactory.fromInputSource(this), @@ -203,13 +218,21 @@ public boolean equals(Object o) && Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && Objects.equals(systemFields, that.systemFields) + && Objects.equals(headersMap, that.headersMap) && Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, config); + return Objects.hash( + uris, + httpAuthenticationUsername, + httpAuthenticationPasswordProvider, + systemFields, + headersMap, + config + ); } @Override @@ -226,6 +249,7 @@ public String toString() ", httpAuthenticationUsername=" + httpAuthenticationUsername + ", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + + ", additionalHeaders = " + headersMap.toString() + "}"; } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index bcd6152f05dd..57ce431f45a3 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -53,6 +53,7 @@ public void testSerde() throws IOException "myName", new DefaultPasswordProvider("myPassword"), new SystemFields(EnumSet.of(SystemField.URI)), + null, httpInputSourceConfig ); final byte[] json = mapper.writeValueAsBytes(source); @@ -68,6 +69,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() "myName", new DefaultPasswordProvider("myPassword"), null, + null, new HttpInputSourceConfig(null) ); @@ -76,6 +78,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() "myName", new DefaultPasswordProvider("myPassword"), null, + null, new HttpInputSourceConfig(null) ); @@ -86,6 +89,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() "myName", new DefaultPasswordProvider("myPassword"), null, + null, new HttpInputSourceConfig(null) ); } @@ -99,6 +103,7 @@ public void testConstructorAllowsOnlyCustomProtocols() "myName", new DefaultPasswordProvider("myPassword"), null, + null, customConfig ); @@ -109,6 +114,7 @@ public void testConstructorAllowsOnlyCustomProtocols() "myName", new DefaultPasswordProvider("myPassword"), null, + null, customConfig ); } @@ -122,6 +128,7 @@ public void testSystemFields() "myName", new DefaultPasswordProvider("myPassword"), new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)), + null, httpInputSourceConfig ); @@ -130,7 +137,7 @@ public void testSystemFields() inputSource.getConfiguredSystemFields() ); - final HttpEntity entity = new HttpEntity(URI.create("https://example.com/foo"), null, null); + final HttpEntity entity = new HttpEntity(URI.create("https://example.com/foo"), null, null, null); Assert.assertEquals("https://example.com/foo", inputSource.getSystemFieldValue(entity, SystemField.URI)); Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH)); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index 80602d0508ad..5f1b5f365fb8 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -204,7 +204,7 @@ public void testIncorrectURI() throws IOException, URISyntaxException ), CloseableIterators.withEmptyBaggage( ImmutableList.of( - new HttpEntity(new URI("testscheme://some/path"), null, null) + new HttpEntity(new URI("testscheme://some/path"), null, null, null) { @Override protected int getMaxRetries() diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java index b4a3cf3425a2..a79c2dbc73a7 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java @@ -19,6 +19,9 @@ package org.apache.druid.catalog.model.table; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.collect.ImmutableMap; import org.apache.druid.catalog.model.CatalogUtils; import org.apache.druid.catalog.model.ColumnSpec; @@ -27,6 +30,7 @@ import org.apache.druid.catalog.model.table.TableFunction.ParameterType; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.HttpInputSource; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.metadata.DefaultPasswordProvider; @@ -93,6 +97,7 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn public static final String PASSWORD_PARAMETER = "password"; public static final String PASSWORD_ENV_VAR_PARAMETER = "passwordEnvVar"; + public static final String HEADERS = "headers"; private static final List URI_PARAMS = Collections.singletonList( new Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true) ); @@ -103,10 +108,15 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn new Parameter(PASSWORD_ENV_VAR_PARAMETER, ParameterType.VARCHAR, true) ); + private static final List HEADERS_PARAMS = Collections.singletonList( + new Parameter(HEADERS, ParameterType.VARCHAR, true) + ); + // Field names in the HttpInputSource protected static final String URIS_FIELD = "uris"; protected static final String PASSWORD_FIELD = "httpAuthenticationPassword"; protected static final String USERNAME_FIELD = "httpAuthenticationUsername"; + protected static final String HEADERS_FIELD = "additionalHeaders"; @Override public String typeValue() @@ -201,7 +211,7 @@ private Matcher templateMatcher(String uriTemplate) @Override protected List adHocTableFnParameters() { - return CatalogUtils.concatLists(URI_PARAMS, USER_PWD_PARAMS); + return CatalogUtils.concatLists(URI_PARAMS, CatalogUtils.concatLists(USER_PWD_PARAMS, HEADERS_PARAMS)); } @Override @@ -210,6 +220,7 @@ protected void convertArgsToSourceMap(Map jsonMap, Map jsonMap, Map args } } + /** + * URIs in SQL is in the form of a string that contains a comma-delimited + * set of URIs. Done since SQL doesn't support array scalars. + */ + private void convertHeaderArg(Map jsonMap, Map args) + { + String additionalHeaders = CatalogUtils.getString(args, HEADERS); + Map headersMap; + if (additionalHeaders != null) { + try { + headersMap = DefaultObjectMapper.INSTANCE.readValue(additionalHeaders, new TypeReference>(){}); + } + catch (JsonProcessingException e) { + throw new ISE("Failed read map from headers json"); + } + jsonMap.put(HEADERS_FIELD, headersMap); + } + + } + /** * Convert the user name and password. All are SQL strings. Passwords must be in * the form of a password provider, so do the needed conversion. HTTP provides diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java index 8c8129bf0fcc..62ff690bd5ca 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java @@ -170,6 +170,7 @@ public void httpDocExample() throws URISyntaxException "bob", new DefaultPasswordProvider("secret"), null, + null, new HttpInputSourceConfig(null) ); Map sourceMap = toMap(inputSource); @@ -195,6 +196,7 @@ public void httpConnDocExample() throws URISyntaxException "bob", new DefaultPasswordProvider("secret"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("koala") diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java index 8a6385db59ce..08b043bc3514 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java @@ -99,6 +99,7 @@ public void testNoFormatWithURI() throws URISyntaxException null, null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -119,6 +120,7 @@ public void testNoColumnsWithUri() throws URISyntaxException null, null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -150,6 +152,7 @@ public void testURIAndTemplate() throws URISyntaxException null, null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -216,6 +219,7 @@ public void testFullTableSpecHappyPath() throws URISyntaxException "bob", new DefaultPasswordProvider("secret"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -344,6 +348,7 @@ public void testTemplateSpecWithoutFormatHappyPath() throws URISyntaxException "bob", new DefaultPasswordProvider("secret"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -382,6 +387,7 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException "bob", new EnvironmentVariablePasswordProvider("SECRET"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -415,6 +421,7 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException "bob", new DefaultPasswordProvider("secret"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -484,6 +491,7 @@ public void testEnvPassword() throws URISyntaxException "bob", new EnvironmentVariablePasswordProvider("SECRET"), null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -518,7 +526,6 @@ private void validateHappyPath(ExternalTableSpec externSpec, boolean withUser) assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword()); } assertEquals("http://foo.com/my.csv", sourceSpec.getUris().get(0).toString()); - // Just a sanity check: details of CSV conversion are tested elsewhere. CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat; assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index 29200730fee3..3261d0e0cc13 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.calcite.avatica.SqlType; import org.apache.druid.catalog.model.Columns; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -86,6 +87,7 @@ protected static URI toURI(String uri) "bob", new DefaultPasswordProvider("secret"), SystemFields.none(), + null, new HttpInputSourceConfig(null) ), new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), @@ -259,6 +261,7 @@ public void testHttpFn2() "bob", new DefaultPasswordProvider("secret"), SystemFields.none(), + ImmutableMap.of("Accept", "application/ndjson", "a", "b"), new HttpInputSourceConfig(null) ), new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, false, false, 0), @@ -280,7 +283,8 @@ public void testHttpFn2() " userName => 'bob',\n" + " password => 'secret',\n" + " uris => ARRAY['http://example.com/foo.csv', 'http://example.com/bar.csv'],\n" + - " format => 'csv'\n" + + " format => 'csv',\n" + + " headers=> '{\"Accept\":\"application/ndjson\", \"a\": \"b\" }'\n" + " )\n" + ") EXTEND (\"timestamp\" VARCHAR, isRobot VARCHAR)\n" + "PARTITIONED BY HOUR") @@ -390,6 +394,7 @@ public void testHttpJson() "bob", new DefaultPasswordProvider("secret"), SystemFields.none(), + null, new HttpInputSourceConfig(null) ), new JsonInputFormat(null, null, null, null, null), From aaea5d1257a9a260ff3af814bcd598539e196fbc Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 16:32:32 -0700 Subject: [PATCH 02/12] Adding allowedHeaders config --- .../indexing/overlord/TaskQueueTest.java | 2 +- .../data/input/impl/HttpInputSource.java | 18 ++++++ .../input/impl/HttpInputSourceConfig.java | 23 ++++++- .../input/impl/HttpInputSourceConfigTest.java | 21 +++++- .../data/input/impl/HttpInputSourceTest.java | 64 +++++++++++++++++-- .../model/table/HttpInputSourceDefn.java | 1 - .../model/table/ExternalTableTest.java | 4 +- .../model/table/HttpInputSourceDefnTest.java | 18 +++--- .../metadata/input/InputSourceModuleTest.java | 2 +- .../sql/calcite/IngestTableFunctionTest.java | 6 +- 10 files changed, 129 insertions(+), 30 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 85cd042efbe7..c7b7b13ef7ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -519,7 +519,7 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException final String password = "AbCd_1234"; final ObjectMapper mapper = getObjectMapper(); - final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http")); + final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http"), null); mapper.setInjectableValues(new InjectableValues.Std() .addValue(HttpInputSourceConfig.class, httpInputSourceConfig) .addValue(ObjectMapper.class, new DefaultObjectMapper()) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index dbfa76d6b516..92814eb14d96 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -46,11 +46,13 @@ import javax.annotation.Nullable; import java.io.File; import java.net.URI; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; public class HttpInputSource @@ -85,6 +87,7 @@ public HttpInputSource( this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.systemFields = systemFields == null ? SystemFields.none() : systemFields; this.headersMap = headersMap == null ? Maps.newHashMap() : headersMap; + throwIfForbiddenHeaders(config, this.headersMap); this.config = config; } @@ -97,6 +100,21 @@ public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List headersMap) + { + if (!config.getAllowedHeaders().isEmpty() && headersMap.size() > 0) { + Set forbiddenHeaderSet = headersMap.keySet() + .stream() + .map(StringUtils::toLowerCase) + .filter(h -> !config.getAllowedHeaders().contains(h)) + .collect(Collectors.toSet()); + if (!forbiddenHeaderSet.isEmpty()) { + throw new IAE("Got forbidden headers %s, allowed headers are only %s ", + forbiddenHeaderSet, config.getAllowedHeaders()); + } + } + } + @JsonIgnore @Nonnull @Override diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java index 310c66904619..1299edbc5744 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -38,14 +39,21 @@ public class HttpInputSourceConfig @JsonProperty private final Set allowedProtocols; + @JsonProperty + private final Set allowedHeaders; + @JsonCreator public HttpInputSourceConfig( - @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols + @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols, + @JsonProperty("allowedHeaders") @Nullable Set allowedHeaders ) { this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty() ? DEFAULT_ALLOWED_PROTOCOLS : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); + this.allowedHeaders = allowedHeaders == null + ? Collections.emptySet() + : allowedHeaders.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); } public Set getAllowedProtocols() @@ -53,6 +61,11 @@ public Set getAllowedProtocols() return allowedProtocols; } + public Set getAllowedHeaders() + { + return allowedHeaders; + } + @Override public boolean equals(Object o) { @@ -63,13 +76,16 @@ public boolean equals(Object o) return false; } HttpInputSourceConfig that = (HttpInputSourceConfig) o; - return Objects.equals(allowedProtocols, that.allowedProtocols); + return Objects.equals(allowedProtocols, that.allowedProtocols) && Objects.equals( + allowedHeaders, + that.allowedHeaders + ); } @Override public int hashCode() { - return Objects.hash(allowedProtocols); + return Objects.hash(allowedProtocols, allowedHeaders); } @Override @@ -77,6 +93,7 @@ public String toString() { return "HttpInputSourceConfig{" + "allowedProtocols=" + allowedProtocols + + ", allowedHeaders=" + allowedHeaders + '}'; } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java index 7f88fc7bf62d..8217bbf699f9 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -19,11 +19,14 @@ package org.apache.druid.data.input.impl; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; + public class HttpInputSourceConfigTest { @Test @@ -35,21 +38,33 @@ public void testEquals() @Test public void testNullAllowedProtocolsUseDefault() { - HttpInputSourceConfig config = new HttpInputSourceConfig(null); + HttpInputSourceConfig config = new HttpInputSourceConfig(null, null); Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + Assert.assertEquals(Collections.emptySet(), config.getAllowedHeaders()); } @Test public void testEmptyAllowedProtocolsUseDefault() { - HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of()); + HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of(), null); Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); } @Test public void testCustomAllowedProtocols() { - HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid")); + HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"), null); + Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + } + + @Test + public void testAllowedHeaders() + { + HttpInputSourceConfig config = new HttpInputSourceConfig( + ImmutableSet.of("druid"), + ImmutableSet.of("Content-Type", "Referer") + ); Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + Assert.assertEquals(ImmutableSet.of("content-type", "referer"), config.getAllowedHeaders()); } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 57ce431f45a3..cc371b57322c 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -22,11 +22,14 @@ import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.systemfield.SystemField; import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; import org.junit.Rule; @@ -35,7 +38,10 @@ import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; public class HttpInputSourceTest { @@ -45,7 +51,7 @@ public class HttpInputSourceTest @Test public void testSerde() throws IOException { - HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null); + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null, null); final ObjectMapper mapper = new ObjectMapper(); mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig)); final HttpInputSource source = new HttpInputSource( @@ -70,7 +76,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() new DefaultPasswordProvider("myPassword"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); new HttpInputSource( @@ -79,7 +85,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() new DefaultPasswordProvider("myPassword"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); expectedException.expect(IllegalArgumentException.class); @@ -90,20 +96,20 @@ public void testConstructorAllowsOnlyDefaultProtocols() new DefaultPasswordProvider("myPassword"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); } @Test public void testConstructorAllowsOnlyCustomProtocols() { - final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid")); + final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"), null); new HttpInputSource( ImmutableList.of(URI.create("druid:///")), "myName", new DefaultPasswordProvider("myPassword"), null, - null, + null, customConfig ); @@ -122,7 +128,7 @@ public void testConstructorAllowsOnlyCustomProtocols() @Test public void testSystemFields() { - HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null); + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null, null); final HttpInputSource inputSource = new HttpInputSource( ImmutableList.of(URI.create("http://test.com/http-test")), "myName", @@ -141,6 +147,50 @@ public void testSystemFields() Assert.assertEquals("https://example.com/foo", inputSource.getSystemFieldValue(entity, SystemField.URI)); Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + Assert.assertEquals(inputSource.getAdditionalHeaders(), Collections.emptyMap()); + } + + @Test + public void testAllowedHeaders() + { + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig( + null, + Sets.newHashSet("R-cookie", "Content-type") + ); + final HttpInputSource inputSource = new HttpInputSource( + ImmutableList.of(URI.create("http://test.com/http-test")), + "myName", + new DefaultPasswordProvider("myPassword"), + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)), + ImmutableMap.of("r-Cookie", "test", "Content-Type", "application/json"), + httpInputSourceConfig + ); + Set expectedSet = inputSource.getAdditionalHeaders() + .keySet() + .stream() + .map(StringUtils::toLowerCase) + .collect(Collectors.toSet()); + Assert.assertEquals(expectedSet, httpInputSourceConfig.getAllowedHeaders()); + } + + @Test + public void shouldFailOnForbiddenHeaders() + { + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig( + null, + Sets.newHashSet("R-cookie", "Content-type") + ); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Got forbidden headers [g-cookie], allowed headers are only [r-cookie, content-type]"); + new HttpInputSource( + ImmutableList.of(URI.create("http://test.com/http-test")), + "myName", + new DefaultPasswordProvider("myPassword"), + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)), + ImmutableMap.of("G-Cookie", "test", "Content-Type", "application/json"), + httpInputSourceConfig + ); } @Test diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java index a79c2dbc73a7..606ac0036577 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.collect.ImmutableMap; import org.apache.druid.catalog.model.CatalogUtils; import org.apache.druid.catalog.model.ColumnSpec; diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java index 62ff690bd5ca..1992f98e2ffe 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java @@ -171,7 +171,7 @@ public void httpDocExample() throws URISyntaxException new DefaultPasswordProvider("secret"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); Map sourceMap = toMap(inputSource); sourceMap.remove("uris"); @@ -197,7 +197,7 @@ public void httpConnDocExample() throws URISyntaxException new DefaultPasswordProvider("secret"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("koala") .inputSource(toMap(inputSource)) diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java index 08b043bc3514..a7ae7b2bed07 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java @@ -57,7 +57,7 @@ public void setup() { mapper.setInjectableValues(new InjectableValues.Std().addValue( HttpInputSourceConfig.class, - new HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS) + new HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, null) )); } @@ -100,7 +100,7 @@ public void testNoFormatWithURI() throws URISyntaxException null, null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -121,7 +121,7 @@ public void testNoColumnsWithUri() throws URISyntaxException null, null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -153,7 +153,7 @@ public void testURIAndTemplate() throws URISyntaxException null, null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -220,7 +220,7 @@ public void testFullTableSpecHappyPath() throws URISyntaxException new DefaultPasswordProvider("secret"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -349,7 +349,7 @@ public void testTemplateSpecWithoutFormatHappyPath() throws URISyntaxException new DefaultPasswordProvider("secret"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(httpToMap(inputSource)) @@ -388,7 +388,7 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException new EnvironmentVariablePasswordProvider("SECRET"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -422,7 +422,7 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException new DefaultPasswordProvider("secret"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(httpToMap(inputSource)) @@ -492,7 +492,7 @@ public void testEnvPassword() throws URISyntaxException new EnvironmentVariablePasswordProvider("SECRET"), null, null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) diff --git a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java index 074b1dbeb221..496f45596a20 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java @@ -79,7 +79,7 @@ public void testHttpInputSourceDefaultConfig() Properties props = new Properties(); Injector injector = makeInjectorWithProperties(props); HttpInputSourceConfig instance = injector.getInstance(HttpInputSourceConfig.class); - Assert.assertEquals(new HttpInputSourceConfig(null), instance); + Assert.assertEquals(new HttpInputSourceConfig(null, null), instance); Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, instance.getAllowedProtocols()); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index 3261d0e0cc13..e70770e0f456 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -88,7 +88,7 @@ protected static URI toURI(String uri) new DefaultPasswordProvider("secret"), SystemFields.none(), null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ), new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), RowSignature.builder() @@ -262,7 +262,7 @@ public void testHttpFn2() new DefaultPasswordProvider("secret"), SystemFields.none(), ImmutableMap.of("Accept", "application/ndjson", "a", "b"), - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ), new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, false, false, 0), RowSignature.builder() @@ -395,7 +395,7 @@ public void testHttpJson() new DefaultPasswordProvider("secret"), SystemFields.none(), null, - new HttpInputSourceConfig(null) + new HttpInputSourceConfig(null, null) ), new JsonInputFormat(null, null, null, null, null), RowSignature.builder() From 2e784970ba8d8edba374b0cce1f2bd5e6813ada4 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 16:38:30 -0700 Subject: [PATCH 03/12] fix check --- .../java/org/apache/druid/data/input/impl/HttpInputSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 92814eb14d96..99279cf7a92c 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -86,7 +86,7 @@ public HttpInputSource( this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.systemFields = systemFields == null ? SystemFields.none() : systemFields; - this.headersMap = headersMap == null ? Maps.newHashMap() : headersMap; + this.headersMap = headersMap == null ? Collections.emptyMap() : headersMap; throwIfForbiddenHeaders(config, this.headersMap); this.config = config; } From 996c087be29c2fbeba7bb5815bf3afc6bd5dc959 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 16:44:25 -0700 Subject: [PATCH 04/12] fix check --- .../java/org/apache/druid/data/input/impl/HttpInputSource.java | 2 -- .../apache/druid/data/input/impl/HttpInputSourceConfigTest.java | 1 - 2 files changed, 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 99279cf7a92c..756f9483c5de 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFormat; @@ -46,7 +45,6 @@ import javax.annotation.Nullable; import java.io.File; import java.net.URI; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java index 8217bbf699f9..74cc62b4d81d 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.data.input.impl; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.Assert; From 877b734d392cc2d2730eee0f9dca80cf76491bb7 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 18:50:22 -0700 Subject: [PATCH 05/12] Fixing tests --- .../druid/catalog/model/table/HttpInputSourceDefn.java | 2 +- .../druid/catalog/model/table/HttpInputSourceDefnTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java index 606ac0036577..7f61bb6933b5 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java @@ -310,7 +310,7 @@ private void convertHeaderArg(Map jsonMap, Map a Map headersMap; if (additionalHeaders != null) { try { - headersMap = DefaultObjectMapper.INSTANCE.readValue(additionalHeaders, new TypeReference>(){}); + headersMap = DefaultObjectMapper.INSTANCE.readValue(additionalHeaders, new TypeReference>(){}); } catch (JsonProcessingException e) { throw new ISE("Failed read map from headers json"); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java index a7ae7b2bed07..e60139824d7d 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java @@ -276,11 +276,12 @@ public void testTemplateSpecWithFormatHappyPath() // Get the partial table function TableFunction fn = externDefn.tableFn(resolved); - assertEquals(4, fn.parameters().size()); + assertEquals(5, fn.parameters().size()); assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER)); assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER)); assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER)); assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER)); + assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS)); // Convert to an external table. ExternalTableSpec externSpec = fn.apply( @@ -324,8 +325,9 @@ public void testTemplateSpecWithFormatAndPassword() // Get the partial table function TableFunction fn = externDefn.tableFn(resolved); - assertEquals(1, fn.parameters().size()); + assertEquals(2, fn.parameters().size()); assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER)); + assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS)); // Convert to an external table. ExternalTableSpec externSpec = fn.apply( From cb93adad52929a822e09fcc63d704732f779eac6 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 20:04:25 -0700 Subject: [PATCH 06/12] More tests --- .../org/apache/druid/data/input/impl/HttpEntity.java | 12 ++++++++++-- .../apache/druid/data/input/impl/HttpEntityTest.java | 11 ++++++----- .../druid/sql/calcite/IngestTableFunctionTest.java | 2 +- .../expected/ingest/httpExtern-logicalPlan.txt | 2 +- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index 333ad108704d..41091b8e9b52 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -46,6 +46,8 @@ public class HttpEntity extends RetryingInputEntity @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; + private final Map additionalHeaders; + HttpEntity( URI uri, @Nullable String httpAuthenticationUsername, @@ -56,6 +58,7 @@ public class HttpEntity extends RetryingInputEntity this.uri = uri; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; + this.additionalHeaders = additionalHeaders; } @Override @@ -67,7 +70,7 @@ public URI getUri() @Override protected InputStream readFrom(long offset) throws IOException { - return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset); + return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset, additionalHeaders); } @Override @@ -82,10 +85,15 @@ public Predicate getRetryCondition() return t -> t instanceof IOException; } - public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset) + public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset, final Map additionalHeaders) throws IOException { final URLConnection urlConnection = object.toURL().openConnection(); + if (additionalHeaders.size() > 0) { + for (Map.Entry entry : additionalHeaders.entrySet()) { + urlConnection.addRequestProperty(entry.getKey(), entry.getValue()); + } + } if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) { String userPass = userName + ":" + passwordProvider.getPassword(); String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass)); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java index 59217456dbfa..e27d0351e83d 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java @@ -42,6 +42,7 @@ import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; +import java.util.Collections; public class HttpEntityTest { @@ -96,8 +97,8 @@ public void testOpenInputStream() throws IOException, URISyntaxException server.start(); URI url = new URI("http://" + server.getAddress().getHostName() + ":" + server.getAddress().getPort() + "/test"); - inputStream = HttpEntity.openInputStream(url, "", null, 0); - inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5); + inputStream = HttpEntity.openInputStream(url, "", null, 0, Collections.emptyMap()); + inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, Collections.emptyMap()); inputStream.skip(5); Assert.assertTrue(IOUtils.contentEquals(inputStream, inputStreamPartial)); } @@ -119,7 +120,7 @@ public void testWithServerSupportingRanges() throws IOException long offset = 15; String contentRange = StringUtils.format("bytes %d-%d/%d", offset, 1000, 1000); Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(contentRange); - HttpEntity.openInputStream(uri, "", null, offset); + HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap()); Mockito.verify(inputStreamMock, Mockito.times(0)).skip(offset); } @@ -128,7 +129,7 @@ public void testWithServerNotSupportingRanges() throws IOException { long offset = 15; Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(null); - HttpEntity.openInputStream(uri, "", null, offset); + HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap()); Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset); } @@ -137,7 +138,7 @@ public void testWithServerNotSupportingBytesRanges() throws IOException { long offset = 15; Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn("token 2-12/12"); - HttpEntity.openInputStream(uri, "", null, offset); + HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap()); Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index e70770e0f456..4bb68cc47ed5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -317,7 +317,7 @@ public void testExplainHttpFn() " format => 'csv'))\n" + " EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" + "PARTITIONED BY ALL TIME"; - final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; + final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"; final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"dst\",\"partitionedBy\":{\"type\":\"all\"}}"; diff --git a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt index 18dcef56af43..c62e5a456959 100644 --- a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt +++ b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt @@ -1,3 +1,3 @@ LogicalInsert(target=[dst], partitionedBy=[ALL TIME], clusteredBy=[]) LogicalProject(inputs=[0..2]) - ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}]) + ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"additionalHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}]) From c0219ffd95d03f60966fc701f6c96c858b976b9f Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 21:59:54 -0700 Subject: [PATCH 07/12] More tests --- .../java/org/apache/druid/data/input/impl/HttpInputSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 756f9483c5de..9dc93acc0cdb 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -265,7 +265,7 @@ public String toString() ", httpAuthenticationUsername=" + httpAuthenticationUsername + ", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + - ", additionalHeaders = " + headersMap.toString() + + ", additionalHeaders = " + headersMap + "}"; } } From 9d4b32d483b279650e5a82295139493c812afc28 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 22:04:31 -0700 Subject: [PATCH 08/12] fixing tests --- .../java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index ed4aa6d91ebe..cfc5b8161a32 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -860,7 +860,7 @@ public void testExplainPlanInsertJoinQuery() // Test correctness of the query when only the CLUSTERED BY clause is present - final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"]},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"]},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]"; + final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"],\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]"; testQuery( PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN, From 2613238deae97ea988e673f3cab354cd6c5def9e Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Tue, 10 Sep 2024 22:35:32 -0700 Subject: [PATCH 09/12] rename to requestHeaders --- .../apache/druid/data/input/impl/HttpEntity.java | 14 +++++++------- .../druid/data/input/impl/HttpInputSource.java | 8 ++++---- .../druid/data/input/impl/HttpInputSourceTest.java | 4 ++-- .../catalog/model/table/HttpInputSourceDefn.java | 8 ++++---- .../druid/sql/calcite/CalciteInsertDmlTest.java | 2 +- .../druid/sql/calcite/IngestTableFunctionTest.java | 2 +- .../expected/ingest/httpExtern-logicalPlan.txt | 2 +- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index 41091b8e9b52..572f29f3e5b0 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -46,19 +46,19 @@ public class HttpEntity extends RetryingInputEntity @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; - private final Map additionalHeaders; + private final Map requestHeaders; HttpEntity( URI uri, @Nullable String httpAuthenticationUsername, @Nullable PasswordProvider httpAuthenticationPasswordProvider, - @Nullable Map additionalHeaders + @Nullable Map requestHeaders ) { this.uri = uri; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; - this.additionalHeaders = additionalHeaders; + this.requestHeaders = requestHeaders; } @Override @@ -70,7 +70,7 @@ public URI getUri() @Override protected InputStream readFrom(long offset) throws IOException { - return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset, additionalHeaders); + return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset, requestHeaders); } @Override @@ -85,12 +85,12 @@ public Predicate getRetryCondition() return t -> t instanceof IOException; } - public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset, final Map additionalHeaders) + public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset, final Map requestHeaders) throws IOException { final URLConnection urlConnection = object.toURL().openConnection(); - if (additionalHeaders.size() > 0) { - for (Map.Entry entry : additionalHeaders.entrySet()) { + if (requestHeaders.size() > 0) { + for (Map.Entry entry : requestHeaders.entrySet()) { urlConnection.addRequestProperty(entry.getKey(), entry.getValue()); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 9dc93acc0cdb..265e994af2ea 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -74,7 +74,7 @@ public HttpInputSource( @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, - @JsonProperty("additionalHeaders") @Nullable Map headersMap, + @JsonProperty("requestHeaders") @Nullable Map headersMap, @JacksonInject HttpInputSourceConfig config ) { @@ -150,9 +150,9 @@ public PasswordProvider getHttpAuthenticationPasswordProvider() } @Nullable - @JsonProperty("additionalHeaders") + @JsonProperty("requestHeaders") @JsonInclude(JsonInclude.Include.NON_NULL) - public Map getAdditionalHeaders() + public Map getRequestHeaders() { return headersMap; } @@ -265,7 +265,7 @@ public String toString() ", httpAuthenticationUsername=" + httpAuthenticationUsername + ", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + - ", additionalHeaders = " + headersMap + + ", requestHeaders = " + headersMap + "}"; } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index cc371b57322c..609512c8b43b 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -147,7 +147,7 @@ public void testSystemFields() Assert.assertEquals("https://example.com/foo", inputSource.getSystemFieldValue(entity, SystemField.URI)); Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH)); - Assert.assertEquals(inputSource.getAdditionalHeaders(), Collections.emptyMap()); + Assert.assertEquals(inputSource.getRequestHeaders(), Collections.emptyMap()); } @Test @@ -165,7 +165,7 @@ public void testAllowedHeaders() ImmutableMap.of("r-Cookie", "test", "Content-Type", "application/json"), httpInputSourceConfig ); - Set expectedSet = inputSource.getAdditionalHeaders() + Set expectedSet = inputSource.getRequestHeaders() .keySet() .stream() .map(StringUtils::toLowerCase) diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java index 7f61bb6933b5..110974d18ca7 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java @@ -115,7 +115,7 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn protected static final String URIS_FIELD = "uris"; protected static final String PASSWORD_FIELD = "httpAuthenticationPassword"; protected static final String USERNAME_FIELD = "httpAuthenticationUsername"; - protected static final String HEADERS_FIELD = "additionalHeaders"; + protected static final String HEADERS_FIELD = "requestHeaders"; @Override public String typeValue() @@ -306,11 +306,11 @@ private void convertUriArg(Map jsonMap, Map args */ private void convertHeaderArg(Map jsonMap, Map args) { - String additionalHeaders = CatalogUtils.getString(args, HEADERS); + String requestHeaders = CatalogUtils.getString(args, HEADERS); Map headersMap; - if (additionalHeaders != null) { + if (requestHeaders != null) { try { - headersMap = DefaultObjectMapper.INSTANCE.readValue(additionalHeaders, new TypeReference>(){}); + headersMap = DefaultObjectMapper.INSTANCE.readValue(requestHeaders, new TypeReference>(){}); } catch (JsonProcessingException e) { throw new ISE("Failed read map from headers json"); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index cfc5b8161a32..fbcb0735c86b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -860,7 +860,7 @@ public void testExplainPlanInsertJoinQuery() // Test correctness of the query when only the CLUSTERED BY clause is present - final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"],\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]"; + final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]"; testQuery( PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index 4bb68cc47ed5..7401477e6d79 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -317,7 +317,7 @@ public void testExplainHttpFn() " format => 'csv'))\n" + " EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" + "PARTITIONED BY ALL TIME"; - final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"additionalHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; + final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"; final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"dst\",\"partitionedBy\":{\"type\":\"all\"}}"; diff --git a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt index c62e5a456959..f9e4c4a5de2b 100644 --- a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt +++ b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt @@ -1,3 +1,3 @@ LogicalInsert(target=[dst], partitionedBy=[ALL TIME], clusteredBy=[]) LogicalProject(inputs=[0..2]) - ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"additionalHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}]) + ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"requestHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}]) From 52a7dfb9860b9ca9bcdf91400ef2339d39fab170 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Wed, 11 Sep 2024 13:48:37 -0700 Subject: [PATCH 10/12] addressing comments --- .../druid/data/input/impl/HttpEntity.java | 2 +- .../data/input/impl/HttpInputSource.java | 38 ++++++------- .../druid/data/input/impl/HttpEntityTest.java | 56 +++++++++++++++++++ 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index 572f29f3e5b0..b0c415d322b2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -89,7 +89,7 @@ public static InputStream openInputStream(URI object, String userName, PasswordP throws IOException { final URLConnection urlConnection = object.toURL().openConnection(); - if (requestHeaders.size() > 0) { + if (requestHeaders != null && requestHeaders.size() > 0) { for (Map.Entry entry : requestHeaders.entrySet()) { urlConnection.addRequestProperty(entry.getKey(), entry.getValue()); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 265e994af2ea..867dbd0b2014 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -36,6 +36,7 @@ import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource; import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; @@ -50,7 +51,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; public class HttpInputSource @@ -66,7 +66,7 @@ public class HttpInputSource private final PasswordProvider httpAuthenticationPasswordProvider; private final SystemFields systemFields; private final HttpInputSourceConfig config; - private final Map headersMap; + private final Map requestHeaders; @JsonCreator public HttpInputSource( @@ -74,7 +74,7 @@ public HttpInputSource( @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, - @JsonProperty("requestHeaders") @Nullable Map headersMap, + @JsonProperty("requestHeaders") @Nullable Map requestHeaders, @JacksonInject HttpInputSourceConfig config ) { @@ -84,8 +84,8 @@ public HttpInputSource( this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.systemFields = systemFields == null ? SystemFields.none() : systemFields; - this.headersMap = headersMap == null ? Collections.emptyMap() : headersMap; - throwIfForbiddenHeaders(config, this.headersMap); + this.requestHeaders = requestHeaders == null ? Collections.emptyMap() : requestHeaders; + throwIfForbiddenHeaders(config, this.requestHeaders); this.config = config; } @@ -98,17 +98,13 @@ public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List headersMap) + public static void throwIfForbiddenHeaders(HttpInputSourceConfig config, Map requestHeaders) { - if (!config.getAllowedHeaders().isEmpty() && headersMap.size() > 0) { - Set forbiddenHeaderSet = headersMap.keySet() - .stream() - .map(StringUtils::toLowerCase) - .filter(h -> !config.getAllowedHeaders().contains(h)) - .collect(Collectors.toSet()); - if (!forbiddenHeaderSet.isEmpty()) { - throw new IAE("Got forbidden headers %s, allowed headers are only %s ", - forbiddenHeaderSet, config.getAllowedHeaders()); + for (Map.Entry entry : requestHeaders.entrySet()) { + if (!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey()))) { + throw InvalidInput.exception("Got forbidden header %s, allowed headers are only %s ", + entry.getKey(), config.getAllowedHeaders() + ); } } } @@ -154,7 +150,7 @@ public PasswordProvider getHttpAuthenticationPasswordProvider() @JsonInclude(JsonInclude.Include.NON_NULL) public Map getRequestHeaders() { - return headersMap; + return requestHeaders; } @Override @@ -177,7 +173,7 @@ public SplittableInputSource withSplit(InputSplit split) httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, - headersMap, + requestHeaders, config ); } @@ -212,7 +208,7 @@ protected InputSourceReader formattableReader( split.get(), httpAuthenticationUsername, httpAuthenticationPasswordProvider, - headersMap + requestHeaders )).iterator() ), SystemFieldDecoratorFactory.fromInputSource(this), @@ -234,7 +230,7 @@ public boolean equals(Object o) && Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && Objects.equals(systemFields, that.systemFields) - && Objects.equals(headersMap, that.headersMap) + && Objects.equals(requestHeaders, that.requestHeaders) && Objects.equals(config, that.config); } @@ -246,7 +242,7 @@ public int hashCode() httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, - headersMap, + requestHeaders, config ); } @@ -265,7 +261,7 @@ public String toString() ", httpAuthenticationUsername=" + httpAuthenticationUsername + ", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + - ", requestHeaders = " + headersMap + + ", requestHeaders = " + requestHeaders + "}"; } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java index e27d0351e83d..8776cd275730 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java @@ -19,7 +19,9 @@ package org.apache.druid.data.input.impl; +import com.google.common.collect.ImmutableMap; import com.google.common.net.HttpHeaders; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpServer; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.StringUtils; @@ -43,6 +45,7 @@ import java.net.URLConnection; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Map; public class HttpEntityTest { @@ -114,6 +117,59 @@ public void testOpenInputStream() throws IOException, URISyntaxException } } + @Test + public void testRequestHeaders() throws IOException, URISyntaxException + { + HttpServer server = null; + InputStream inputStream = null; + InputStream inputStreamPartial = null; + ServerSocket serverSocket = null; + Map requestHeaders = ImmutableMap.of("r-Cookie", "test", "Content-Type", "application/json"); + try { + serverSocket = new ServerSocket(0); + int port = serverSocket.getLocalPort(); + // closing port so that the httpserver can use. Can cause race conditions. + serverSocket.close(); + server = HttpServer.create(new InetSocketAddress("localhost", port), 0); + server.createContext( + "/test", + (httpExchange) -> { + Headers headers = httpExchange.getRequestHeaders(); + for (Map.Entry entry : requestHeaders.entrySet()) { + Assert.assertTrue(headers.containsKey(entry.getKey())); + Assert.assertEquals(headers.get(entry.getKey()).get(0), entry.getValue()); + } + String payload = "12345678910"; + byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8); + httpExchange.sendResponseHeaders(200, outputBytes.length); + OutputStream os = httpExchange.getResponseBody(); + httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE, "application/octet-stream"); + httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH, String.valueOf(outputBytes.length)); + httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes 0"); + os.write(outputBytes); + os.close(); + } + ); + server.start(); + + URI url = new URI("http://" + server.getAddress().getHostName() + ":" + server.getAddress().getPort() + "/test"); + inputStream = HttpEntity.openInputStream(url, "", null, 0, requestHeaders); + inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, requestHeaders); + inputStream.skip(5); + Assert.assertTrue(IOUtils.contentEquals(inputStream, inputStreamPartial)); + } + finally { + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(inputStreamPartial); + if (server != null) { + server.stop(0); + } + if (serverSocket != null) { + serverSocket.close(); + } + } + } + @Test public void testWithServerSupportingRanges() throws IOException { From afe9a33dd07851c595932207d419e38f6cee4598 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Wed, 11 Sep 2024 13:59:04 -0700 Subject: [PATCH 11/12] addressing comments --- .../apache/druid/data/input/impl/HttpInputSourceTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 609512c8b43b..118b56838b6c 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.systemfield.SystemField; import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; @@ -180,9 +181,9 @@ public void shouldFailOnForbiddenHeaders() null, Sets.newHashSet("R-cookie", "Content-type") ); - expectedException.expect(IllegalArgumentException.class); + expectedException.expect(DruidException.class); expectedException.expectMessage( - "Got forbidden headers [g-cookie], allowed headers are only [r-cookie, content-type]"); + "Got forbidden header G-Cookie, allowed headers are only [r-cookie, content-type]"); new HttpInputSource( ImmutableList.of(URI.create("http://test.com/http-test")), "myName", From 831c3bcaea403f3ccf6acebf1606e63065cbb6e8 Mon Sep 17 00:00:00 2001 From: Pranav Bhole Date: Wed, 11 Sep 2024 14:46:10 -0700 Subject: [PATCH 12/12] addressing comments --- .../druid/data/input/impl/HttpInputSource.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 867dbd0b2014..12f2316fb67d 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -100,11 +100,13 @@ public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List requestHeaders) { - for (Map.Entry entry : requestHeaders.entrySet()) { - if (!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey()))) { - throw InvalidInput.exception("Got forbidden header %s, allowed headers are only %s ", - entry.getKey(), config.getAllowedHeaders() - ); + if (config.getAllowedHeaders().size() > 0) { + for (Map.Entry entry : requestHeaders.entrySet()) { + if (!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey()))) { + throw InvalidInput.exception("Got forbidden header %s, allowed headers are only %s ", + entry.getKey(), config.getAllowedHeaders() + ); + } } } }