diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index 81899703dcf2..1d7fc689b238 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -22,19 +22,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.AbstractInputSourceBuilder; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceFactory; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; @@ -60,7 +66,7 @@ public class IcebergInputSource implements SplittableInputSource> private IcebergFilter icebergFilter; @JsonProperty - private AbstractInputSourceBuilder warehouseSource; + private InputSourceFactory warehouseSource; private boolean isLoaded = false; @@ -72,7 +78,7 @@ public IcebergInputSource( @JsonProperty("namespace") String namespace, @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, - @JsonProperty("warehouseSource") AbstractInputSourceBuilder warehouseSource + @JsonProperty("warehouseSource") InputSourceFactory warehouseSource ) { this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); @@ -170,7 +176,77 @@ protected void retrieveIcebergDatafiles() getTableName(), getIcebergFilter() ); - delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles); + if (snapshotDataFiles.isEmpty()) { + delegateInputSource = new EmptyInputSource(); + } else { + delegateInputSource = warehouseSource.create(snapshotDataFiles); + } isLoaded = true; } + + /** + * This input source is used in place of a delegate input source if there are no input file paths. + * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource + * may use this input source as delegate in such cases. + */ + private static class EmptyInputSource implements SplittableInputSource + { + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + return new InputSourceReader() + { + @Override + public CloseableIterator read(InputStats inputStats) + { + return CloseableIterators.wrap(Collections.emptyIterator(), () -> { + }); + } + + @Override + public CloseableIterator sample() + { + return CloseableIterators.wrap(Collections.emptyIterator(), () -> { + }); + } + }; + } + + @Override + public Stream createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + return Stream.empty(); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return 0; + } + + @Override + public InputSource withSplit(InputSplit split) + { + return null; + } + } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 9c3de4922b1c..ec6d936d14bd 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -24,7 +24,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.LocalInputSource; -import org.apache.druid.data.input.impl.LocalInputSourceBuilder; +import org.apache.druid.data.input.impl.LocalInputSourceFactory; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; import org.apache.druid.java.util.common.FileUtils; import org.apache.iceberg.DataFile; @@ -87,7 +87,7 @@ public void testInputSource() throws IOException NAMESPACE, null, testCatalog, - new LocalInputSourceBuilder() + new LocalInputSourceFactory() ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) @@ -115,7 +115,7 @@ public void testInputSource() throws IOException } @Test - public void testInputSourceWithFilter() throws IOException + public void testInputSourceWithEmptySource() throws IOException { final File warehouseDir = FileUtils.createTempDir(); testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); @@ -128,7 +128,28 @@ public void testInputSourceWithFilter() throws IOException NAMESPACE, new IcebergEqualsFilter("id", "0000"), testCatalog, - new LocalInputSourceBuilder() + new LocalInputSourceFactory() + ); + Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); + Assert.assertEquals(0, splits.count()); + dropTableFromCatalog(tableIdentifier); + } + + @Test + public void testInputSourceWithFilter() throws IOException + { + final File warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + + createAndLoadTable(tableIdentifier); + + IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", "123988"), + testCatalog, + new LocalInputSourceFactory() ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) @@ -137,7 +158,21 @@ public void testInputSourceWithFilter() throws IOException .flatMap(List::stream) .collect(Collectors.toList()); - Assert.assertEquals(0, localInputSourceList.size()); + Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null))); + Assert.assertEquals(1, localInputSourceList.size()); + CloseableIterable datafileReader = Parquet.read(Files.localInput(localInputSourceList.get(0))) + .project(tableSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader( + tableSchema, + fileSchema + )) + .build(); + + + for (Record record : datafileReader) { + Assert.assertEquals(tableData.get("id"), record.get(0)); + Assert.assertEquals(tableData.get("name"), record.get(1)); + } dropTableFromCatalog(tableIdentifier); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java similarity index 86% rename from extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java rename to extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java index 94b67ca93dce..d2b9da3e3440 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java @@ -21,20 +21,20 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.InputSourceFactory; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.Hdfs; import org.apache.hadoop.conf.Configuration; import java.util.List; -public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder +public class HdfsInputSourceFactory implements InputSourceFactory { private final Configuration configuration; private final HdfsInputSourceConfig inputSourceConfig; @JsonCreator - public HdfsInputSourceBuilder( + public HdfsInputSourceFactory( @JacksonInject @Hdfs Configuration configuration, @JacksonInject HdfsInputSourceConfig inputSourceConfig ) @@ -44,7 +44,7 @@ public HdfsInputSourceBuilder( } @Override - public SplittableInputSource generateInputSource(List inputFilePaths) + public SplittableInputSource create(List inputFilePaths) { return new HdfsInputSource(inputFilePaths, configuration, inputSourceConfig); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 0923450c5fb0..e2c79785fe4f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -34,8 +34,8 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; import org.apache.druid.inputsource.hdfs.HdfsInputSource; -import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceFactory; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.hadoop.conf.Configuration; @@ -67,7 +67,7 @@ public List getJacksonModules() new SimpleModule().registerSubtypes( new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME), new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME), - new NamedType(HdfsInputSourceBuilder.class, HdfsStorageDruidModule.SCHEME) + new NamedType(HdfsInputSourceFactory.class, HdfsStorageDruidModule.SCHEME) ) ); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java index 855a67ac7220..863f083c713a 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java @@ -32,7 +32,7 @@ public void testAdapterGet() { Configuration conf = new Configuration(); HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null); - HdfsInputSourceBuilder hdfsInputSourceAdapter = new HdfsInputSourceBuilder(conf, inputSourceConfig); - Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource); + HdfsInputSourceFactory hdfsInputSourceAdapter = new HdfsInputSourceFactory(conf, inputSourceConfig); + Assert.assertTrue(hdfsInputSourceAdapter.create(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java index b8227d19456d..7727c3511558 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java @@ -40,7 +40,7 @@ public List getJacksonModules() return ImmutableList.of( new SimpleModule().registerSubtypes( new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME), - new NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME) + new NamedType(S3InputSourceFactory.class, S3StorageDruidModule.SCHEME) ) ); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java similarity index 94% rename from extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java rename to extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java index 1755cb041c54..d8df28ca6b69 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java @@ -27,7 +27,7 @@ import org.apache.druid.common.aws.AWSClientConfig; import org.apache.druid.common.aws.AWSEndpointConfig; import org.apache.druid.common.aws.AWSProxyConfig; -import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.InputSourceFactory; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.s3.S3InputDataConfig; @@ -39,7 +39,7 @@ import java.util.List; import java.util.stream.Collectors; -public class S3InputSourceBuilder extends AbstractInputSourceBuilder +public class S3InputSourceFactory implements InputSourceFactory { private final ServerSideEncryptingAmazonS3 s3Client; private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder; @@ -51,7 +51,7 @@ public class S3InputSourceBuilder extends AbstractInputSourceBuilder private final AWSEndpointConfig awsEndpointConfig; @JsonCreator - public S3InputSourceBuilder( + public S3InputSourceFactory( @JacksonInject ServerSideEncryptingAmazonS3 s3Client, @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder, @JacksonInject S3InputDataConfig inputDataConfig, @@ -73,7 +73,7 @@ public S3InputSourceBuilder( } @Override - public SplittableInputSource generateInputSource(List inputFilePaths) + public SplittableInputSource create(List inputFilePaths) { return new S3InputSource( s3Client, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java similarity index 90% rename from extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java rename to extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java index bb42975cd61e..2304b0f8bf8b 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java @@ -28,7 +28,7 @@ import java.util.Arrays; import java.util.List; -public class S3InputSourceBuilderTest +public class S3InputSourceFactoryTest { @Test public void testAdapterGet() @@ -43,7 +43,7 @@ public void testAdapterGet() "s3://bar/foo/file3.txt" ); - S3InputSourceBuilder s3Builder = new S3InputSourceBuilder( + S3InputSourceFactory s3Builder = new S3InputSourceFactory( service, serverSides3Builder, dataConfig, @@ -53,6 +53,6 @@ public void testAdapterGet() null, null ); - Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof S3InputSource); + Assert.assertTrue(s3Builder.create(fileUris) instanceof S3InputSource); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java deleted file mode 100644 index a0af825c1a99..000000000000 --- a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input; - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.data.input.impl.LocalInputSourceBuilder; -import org.apache.druid.data.input.impl.SplittableInputSource; -import org.apache.druid.java.util.common.CloseableIterators; -import org.apache.druid.java.util.common.parsers.CloseableIterator; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.Collections; -import java.util.List; -import java.util.stream.Stream; - -/** - * A wrapper on top of {@link SplittableInputSource} that handles input source creation. - * For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths - * are not available yet and this might fail the input source precondition checks. - * This adapter helps create the delegate input source once the input file paths are fully determined. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = LocalInputSourceBuilder.class) -}) -public abstract class AbstractInputSourceBuilder -{ - public abstract SplittableInputSource generateInputSource(List inputFilePaths); - - public SplittableInputSource setupInputSource(List inputFilePaths) - { - if (inputFilePaths.isEmpty()) { - return new EmptyInputSource(); - } else { - return generateInputSource(inputFilePaths); - } - } - - /** - * This input source is used in place of a delegate input source if there are no input file paths. - * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource - * may use this input source as delegate in such cases. - */ - private static class EmptyInputSource implements SplittableInputSource - { - @Override - public boolean needsFormat() - { - return false; - } - - @Override - public boolean isSplittable() - { - return false; - } - - @Override - public InputSourceReader reader( - InputRowSchema inputRowSchema, - @Nullable InputFormat inputFormat, - File temporaryDirectory - ) - { - return new InputSourceReader() - { - @Override - public CloseableIterator read(InputStats inputStats) - { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); - } - - @Override - public CloseableIterator sample() - { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); - } - }; - } - - @Override - public Stream createSplits( - InputFormat inputFormat, - @Nullable SplitHintSpec splitHintSpec - ) - { - return Stream.empty(); - } - - @Override - public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) - { - return 0; - } - - @Override - public InputSource withSplit(InputSplit split) - { - return null; - } - } -} diff --git a/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java new file mode 100644 index 000000000000..4b298695cdac --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.impl.LocalInputSourceFactory; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.annotations.UnstableApi; + +import java.util.List; + +/** + * An interface to generate a {@link SplittableInputSource} objects on the fly. + * For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths + * are not available yet and this might fail the input source precondition checks. + * This factory helps create the delegate input source once the input file paths are fully determined. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "local", value = LocalInputSourceFactory.class) +}) +@UnstableApi +public interface InputSourceFactory +{ + SplittableInputSource create(List inputFilePaths); +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java similarity index 81% rename from processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java rename to processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java index 6e2f44b56741..b2fa6a13dcb7 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java @@ -19,18 +19,17 @@ package org.apache.druid.data.input.impl; -import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.InputSourceFactory; import java.io.File; import java.util.List; import java.util.stream.Collectors; -public class LocalInputSourceBuilder extends AbstractInputSourceBuilder +public class LocalInputSourceFactory implements InputSourceFactory { - public static final String TYPE_KEY = "local"; @Override - public LocalInputSource generateInputSource(List inputFilePaths) + public LocalInputSource create(List inputFilePaths) { return new LocalInputSource( null, diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java index 78a33c05b463..38ba640e49ec 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java @@ -19,22 +19,12 @@ package org.apache.druid.data.input.impl; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSource; -import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.IOException; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; public class LocalInputSourceAdapterTest { @@ -44,44 +34,10 @@ public class LocalInputSourceAdapterTest @Test public void testAdapterGet() { - LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); - Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList( + LocalInputSourceFactory localInputSourceAdapter = new LocalInputSourceFactory(); + Assert.assertTrue(localInputSourceAdapter.create(Arrays.asList( "foo.parquet", "bar.parquet" )) instanceof LocalInputSource); } - - @Test - public void testAdapterSetup() - { - LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); - InputSource delegateInputSource = localInputSourceAdapter.setupInputSource(Arrays.asList( - "foo.parquet", - "bar.parquet" - )); - Assert.assertTrue(delegateInputSource instanceof LocalInputSource); - } - - @Test - public void testEmptyInputSource() throws IOException - { - InputFormat mockFormat = EasyMock.createMock(InputFormat.class); - SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class); - LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); - SplittableInputSource emptyInputSource = - (SplittableInputSource) localInputSourceAdapter.setupInputSource(Collections.emptyList()); - List> splitList = emptyInputSource - .createSplits(mockFormat, mockSplitHint) - .collect(Collectors.toList()); - Assert.assertTrue(splitList.isEmpty()); - Assert.assertFalse(emptyInputSource.isSplittable()); - Assert.assertFalse(emptyInputSource.needsFormat()); - Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class))); - Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, mockSplitHint)); - Assert.assertFalse(emptyInputSource.reader( - EasyMock.createMock(InputRowSchema.class), - mockFormat, - temporaryFolder.newFolder() - ).read().hasNext()); - } }