Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void testAdHocUri()
CatalogUtils.stringListToUriList(uris),
s3InputSource.getUris()
);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
Expand All @@ -366,7 +366,7 @@ public void testMultipleAdHocUris()
CatalogUtils.stringListToUriList(uris),
s3InputSource.getUris()
);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand Down Expand Up @@ -399,7 +399,7 @@ public void testAdHocUriWithGlob()
s3InputSource.getUris()
);
assertEquals("*.csv", s3InputSource.getObjectGlob());
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand All @@ -419,7 +419,7 @@ public void testAdHocPrefix()
CatalogUtils.stringListToUriList(prefixes),
s3InputSource.getPrefixes()
);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
Expand All @@ -442,7 +442,7 @@ public void testMultipleAdHocPrefixes()
CatalogUtils.stringListToUriList(prefixes),
s3InputSource.getPrefixes()
);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand All @@ -462,7 +462,7 @@ public void testAdHocBucketAndPaths()
CloudObjectLocation obj = s3InputSource.getObjects().get(0);
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
Expand Down Expand Up @@ -510,7 +510,7 @@ public void testMultipleAdHocObjects()
obj = s3InputSource.getObjects().get(1);
assertEquals("foo.com", obj.getBucket());
assertEquals("mumble/file2.csv", obj.getPath());
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand Down Expand Up @@ -578,7 +578,7 @@ public void testFullTableSpecHappyPath()
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
ExternalTableSpec externSpec = externDefn.convert(resolved);
assertEquals(s3InputSource, externSpec.inputSource);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
Expand Down Expand Up @@ -613,7 +613,7 @@ public void testTableSpecWithoutConfig()
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
ExternalTableSpec externSpec = externDefn.convert(resolved);
assertEquals(s3InputSource, externSpec.inputSource);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
Expand Down Expand Up @@ -672,7 +672,7 @@ public void testTableSpecWithBucketAndFormat()
CloudObjectLocation obj = s3InputSource.getObjects().get(0);
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// But, it fails columns are provided since the table already has them.
assertThrows(IAE.class, () -> fn.apply("x", args, COLUMNS, mapper));
Expand Down Expand Up @@ -714,7 +714,7 @@ public void testTableSpecAsConnection()
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
assertTrue(externSpec.inputFormat instanceof CsvInputFormat);
assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());

// But, it fails columns are not provided since the table does not have them.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CollectionUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -152,7 +153,7 @@ protected ExternalTableSpec convertArgsToTable(
convertArgsToSource(args, jsonMapper),
convertArgsToFormat(args, columns, jsonMapper),
Columns.convertSignature(columns),
typeValue()
() -> Collections.singleton(typeValue())
);
}

Expand Down Expand Up @@ -209,7 +210,7 @@ public ExternalTableSpec convertTable(ResolvedExternalTable table)
convertTableToSource(table),
convertTableToFormat(table),
Columns.convertSignature(table.resolvedTable().spec().columns()),
typeValue()
() -> Collections.singleton(typeValue())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.druid.catalog.model.table;

import com.google.common.base.Supplier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.Set;

/**
* Catalog form of an external table specification used to pass along the three
Expand All @@ -36,17 +38,17 @@ public class ExternalTableSpec
public final InputSource inputSource;
public final InputFormat inputFormat;
@Nullable public final RowSignature signature;
public final String inputSourceType;
public final Supplier<Set<String>> inputSourceTypesSupplier;

public ExternalTableSpec(
final InputSource inputSource,
final InputFormat inputFormat,
final RowSignature signature,
final String inputSourceType)
final Supplier<Set<String>> inputSourceTypesSupplier)
{
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.signature = signature;
this.inputSourceType = inputSourceType;
this.inputSourceTypesSupplier = inputSourceTypesSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.utils.CollectionUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -194,7 +195,7 @@ protected ExternalTableSpec convertPartialFormattedTable(
convertSource(sourceMap, jsonMapper),
inputFormat,
Columns.convertSignature(completedCols),
typeValue()
() -> Collections.singleton(typeValue())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv", "http://foo.com/bar.csv")),
sourceSpec.getUris()
);
assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/my.csv", "http://foo.com/bar.csv")),
sourceSpec.getUris()
);
assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand All @@ -466,7 +466,7 @@ public void testMultipleURIsAdHoc()
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv", "http://foo.com/bar.csv")),
sourceSpec.getUris()
);
assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

@Test
Expand Down Expand Up @@ -499,7 +499,7 @@ public void testEnvPassword() throws URISyntaxException
"SECRET",
((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable()
);
assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

private void validateHappyPath(ExternalTableSpec externSpec, boolean withUser)
Expand All @@ -519,7 +519,7 @@ private void validateHappyPath(ExternalTableSpec externSpec, boolean withUser)
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}

private Map<String, Object> httpToMap(HttpInputSource source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testValidAdHocFn()
CsvInputFormat format = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), format.getColumns());
assertEquals(2, extern.signature.size());
assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), extern.inputSourceTypesSupplier.get());

// Fails if no columns are provided.
assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), Collections.emptyList(), mapper));
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testPartialTable()
CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
assertEquals(2, extern.signature.size());
assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), extern.inputSourceTypesSupplier.get());

// Cannot supply columns with the function
List<ColumnSpec> columns = Arrays.asList(
Expand Down Expand Up @@ -215,6 +215,6 @@ public void testDefinedTable()
CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
assertEquals(2, extern.signature.size());
assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), extern.inputSourceTypesSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,6 @@ private void validateFormat(ExternalTableSpec externSpec)
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
assertEquals(LocalInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
assertEquals(Collections.singleton(LocalInputSourceDefn.TYPE_KEY), externSpec.inputSourceTypesSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.druid.sql.calcite.external;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.NlsString;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand All @@ -34,6 +34,7 @@
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
Expand All @@ -55,11 +56,10 @@ public Set<ResourceAction> computeResources(final SqlCall call, boolean inputSou
String inputSourceStr = getInputSourceArgument(call);

try {
JsonNode jsonNode = ((DruidTableMacro) macro).getJsonMapper().readTree(inputSourceStr);
return Collections.singleton(new ResourceAction(new Resource(
ResourceType.EXTERNAL,
jsonNode.get("type").asText()
), Action.READ));
InputSource inputSource = ((DruidTableMacro) macro).getJsonMapper().readValue(inputSourceStr, InputSource.class);
return inputSource.getTypes().stream()
.map(inputSourceType -> new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
.collect(Collectors.toSet());
}
catch (JsonProcessingException e) {
// this shouldn't happen, the input source paraemeter should have been validated before this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ public ExternalTableSpec apply(
}

String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
String inputSrcType = jsonMapper.readTree(inputSrcStr).get("type").asText();
InputSource inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class);
return new ExternalTableSpec(
jsonMapper.readValue(inputSrcStr, InputSource.class),
inputSource,
jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class),
rowSignature,
inputSrcType
inputSource::getTypes
);
}
catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.external;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -55,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -297,7 +299,7 @@ public static ExternalTable buildExternalTable(
+ "Please change the column name to something other than __time");
}

return toExternalTable(spec, jsonMapper, spec.inputSourceType);
return toExternalTable(spec, jsonMapper, spec.inputSourceTypesSupplier);
}

public static ResourceAction externalRead(String name)
Expand All @@ -308,7 +310,7 @@ public static ResourceAction externalRead(String name)
public static ExternalTable toExternalTable(
ExternalTableSpec spec,
ObjectMapper jsonMapper,
String inputSourceType
Supplier<Set<String>> inputSourceTypesSupplier
)
{
return new ExternalTable(
Expand All @@ -319,7 +321,7 @@ public static ExternalTable toExternalTable(
),
spec.signature,
jsonMapper,
inputSourceType
inputSourceTypesSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Table macro designed for use with the Druid EXTEND operator. Example:
Expand Down Expand Up @@ -172,12 +173,14 @@ public Set<ResourceAction> computeResources(final SqlCall call, final boolean in
{
Set<ResourceAction> resourceActions = new HashSet<>();
if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
resourceActions.add(new ResourceAction(new Resource(
ResourceType.EXTERNAL,
((ExternalTable) table).getInputSourceType()
), Action.READ));
resourceActions.addAll(((ExternalTable) table)
.getInputSourceTypeSupplier().get().stream()
.map(inputSourceType ->
new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
.collect(Collectors.toSet()));
} else {
resourceActions.addAll(base.computeResources(call, inputSourceTypeSecurityEnabled));
}
resourceActions.addAll(base.computeResources(call, inputSourceTypeSecurityEnabled));
return resourceActions;
}
}
Expand Down
Loading