From 10b7e093b85e56c0bedf136e4d659219a5e58419 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Mar 2020 20:15:31 -0800 Subject: [PATCH 01/10] Skip empty files for local, hdfs, and cloud input sources --- .../apache/druid/data/input/InputEntity.java | 2 + .../data/input/MaxSizeSplitHintSpec.java | 13 +- .../data/input/impl/InlineInputSource.java | 2 + .../data/input/impl/LocalInputSource.java | 12 +- .../data/input/MaxSizeSplitHintSpecTest.java | 23 ++ .../data/input/impl/LocalInputSourceTest.java | 11 + .../storage/azure/AzureCloudBlobIterator.java | 2 +- .../azure/AzureCloudBlobIteratorTest.java | 61 +++- .../google/GoogleCloudStorageInputSource.java | 4 +- .../druid/storage/google/GoogleUtils.java | 105 +------ .../storage/google/ObjectStorageIterator.java | 129 ++++++++ .../google/ObjectStorageIteratorTest.java | 286 ++++++++++++++++++ .../inputsource/hdfs/HdfsInputSource.java | 1 + .../storage/s3/ObjectSummaryIterator.java | 4 +- .../data/input/s3/S3InputSourceTest.java | 54 +++- .../storage/s3/ObjectSummaryIteratorTest.java | 3 +- 16 files changed, 592 insertions(+), 120 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 470e04060aaf..d900110b3bb1 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -35,6 +35,8 @@ /** * InputEntity abstracts an input entity and knows how to read bytes from the given entity. + * Implementation assume that the given input entity is not empty, the InputSources creating InputEntitires should + * filter out empty entities properly. */ @UnstableApi public interface InputEntity diff --git a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java index 4f810e2c629e..e5e553112068 100644 --- a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.FluentIterable; +import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; import java.util.ArrayList; @@ -40,6 +42,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec { public static final String TYPE = "maxSize"; + private static final Logger LOG = new Logger(MaxSizeSplitHintSpec.class); @VisibleForTesting static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024; @@ -61,6 +64,10 @@ public long getMaxSplitSize() @Override public Iterator> split(Iterator inputIterator, Function inputAttributeExtractor) { + final Iterator nonEmptyFileOnlyIterator = FluentIterable + .from(() -> inputIterator) + .filter(input -> inputAttributeExtractor.apply(input).getSize() > 0) + .iterator(); return new Iterator>() { private T peeking; @@ -68,7 +75,7 @@ public Iterator> split(Iterator inputIterator, Function next() } final List current = new ArrayList<>(); long splitSize = 0; - while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) { + while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) { if (peeking == null) { - peeking = inputIterator.next(); + peeking = nonEmptyFileOnlyIterator.next(); } final long size = inputAttributeExtractor.apply(peeking).getSize(); if (current.isEmpty() || splitSize + size < maxSplitSize) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java index a99f7cdc4447..1e7b59fcf4a9 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -38,6 +39,7 @@ public class InlineInputSource extends AbstractInputSource @JsonCreator public InlineInputSource(@JsonProperty("data") String data) { + Preconditions.checkArgument(data != null && !data.isEmpty(), "empty data"); this.data = data; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 65b0f6195b14..04b7dc7b2cd9 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -125,10 +125,14 @@ private Iterator> getSplitFileIterator(SplitHintSpec splitHintSpec) @VisibleForTesting Iterator getFileIterator() { - return Iterators.concat( - getDirectoryListingIterator(), - getFilesListIterator() - ); + return + Iterators.filter( + Iterators.concat( + getDirectoryListingIterator(), + getFilesListIterator() + ), + file -> file.length() > 0 + ); } private Iterator getDirectoryListingIterator() diff --git a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java index 6e7db2992db1..2ad40afa579f 100644 --- a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java +++ b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java @@ -79,6 +79,29 @@ public void testSplitLargeInputsReturningSplitsOfSingleInput() } } + @Test + public void testSplitSkippingEmptyInputs() + { + final int nonEmptyInputSize = 3; + final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L); + final Function inputAttributeExtractor = InputFileAttribute::new; + final IntStream dataStream = IntStream.concat( + IntStream.concat( + IntStream.generate(() -> 0).limit(10), + IntStream.generate(() -> nonEmptyInputSize).limit(10) + ), + IntStream.generate(() -> 0).limit(10) + ); + final List> splits = Lists.newArrayList( + splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor) + ); + Assert.assertEquals(4, splits.size()); + Assert.assertEquals(3, splits.get(0).size()); + Assert.assertEquals(3, splits.get(1).size()); + Assert.assertEquals(3, splits.get(2).size()); + Assert.assertEquals(1, splits.get(3).size()); + } + @Test public void testEquals() { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index 6d7342c4180f..d818ff6ded1c 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.commons.compress.utils.Lists; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; @@ -137,6 +138,16 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti Assert.assertEquals(filesInBaseDir, actualFiles); } + @Test + public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly() + { + final Set files = new HashSet<>(prepareFiles(10, 5)); + files.addAll(prepareFiles(10, 0)); + final LocalInputSource inputSource = new LocalInputSource(null, null, files); + List iteratedFiles = Lists.newArrayList(inputSource.getFileIterator()); + Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0)); + } + private static Set prepareFiles(int numFiles, long fileSize) { final Set files = new HashSet<>(); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java index 72d6509f5ec1..c2a696c3d757 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java @@ -150,7 +150,7 @@ private void advanceBlobItem() while (blobItemIterator.hasNext()) { ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next()); /* skip directory objects */ - if (blobItem.isCloudBlob()) { + if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) { currentBlobItem = blobItem.getCloudBlob(); return; } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java index e0fd4b455459..dbccf0446eb3 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.azure; +import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableList; import com.microsoft.azure.storage.ResultContinuation; import com.microsoft.azure.storage.ResultSegment; @@ -127,10 +128,12 @@ public void setup() blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class); directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); @@ -138,6 +141,7 @@ public void setup() blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class); directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class); @@ -163,13 +167,13 @@ public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpecte EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs1); + cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( cloudBlobItemPrefixWithOnlyCloudBlobs1); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs2); + cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn( cloudBlobItemPrefixWithOnlyCloudBlobs2); @@ -179,7 +183,7 @@ public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpecte EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithCloudBlobsAndDirectories); + cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn( cloudBlobItemPrefixWithCloudBlobsAndDirectories); @@ -273,6 +277,57 @@ public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpecte verifyAll(); } + @Test + public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException + { + EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); + EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( + cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); + EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( + cloudBlobItemPrefixWithOnlyCloudBlobs1); + + ListBlobItem emptyBlobItem = createMock(ListBlobItem.class); + ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class); + CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class); + EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes(); + EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true); + EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes(); + + EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder); + + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( + CONTAINER1, + PREFIX_ONLY_CLOUD_BLOBS, + nullResultContinuationToken, + MAX_LISTING_LENGTH + )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1); + + EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken()) + .andReturn(nullResultContinuationToken); + ArrayList resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>(); + resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1); + resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem); + EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults()) + .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1); + + replayAll(); + + azureCloudBlobIterator = new AzureCloudBlobIterator( + storage, + blobItemDruidFactory, + config, + ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI), + MAX_LISTING_LENGTH + ); + + List expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1); + List actualBlobItems = Lists.newArrayList(azureCloudBlobIterator); + Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); + Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems)); + verifyAll(); + } + @Test(expected = NoSuchElementException.class) public void test_next_emptyPrefixes_throwsNoSuchElementException() { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index b1ae3a4ff9dd..9d37b3f7f503 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -47,7 +47,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - static final String SCHEME = "gs"; + public static final String SCHEME = "gs"; private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); @@ -117,7 +117,7 @@ public SplittableInputSource> withSplit(InputSplit storageObjectIterable() diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 9fbb23f040c0..2c1817850655 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,18 +20,15 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; import java.net.URI; import java.util.Iterator; -import java.util.NoSuchElementException; public class GoogleUtils { @@ -46,11 +43,20 @@ public static boolean isRetryable(Throwable t) return t instanceof IOException; } - private static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception { return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } + public static URI objectToUri(StorageObject object) + { + return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME); + } + + public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) + { + return new CloudObjectLocation(object.getBucket(), object.getName()); + } public static Iterator lazyFetchingStorageObjectsIterator( final GoogleStorage storage, @@ -58,91 +64,6 @@ public static Iterator lazyFetchingStorageObjectsIterator( final long maxListingLength ) { - return new Iterator() - { - private Storage.Objects.List listRequest; - private Objects results; - private URI currentUri; - private String currentBucket; - private String currentPrefix; - private String nextPageToken; - private Iterator storageObjectsIterator; - - { - nextPageToken = null; - prepareNextRequest(); - fetchNextBatch(); - } - - private void prepareNextRequest() - { - try { - currentUri = uris.next(); - currentBucket = currentUri.getAuthority(); - currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); - nextPageToken = null; - listRequest = storage.list(currentBucket) - .setPrefix(currentPrefix) - .setMaxResults(maxListingLength); - - } - catch (IOException io) { - throw new RuntimeException(io); - } - } - - private void fetchNextBatch() - { - try { - listRequest.setPageToken(nextPageToken); - results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); - storageObjectsIterator = results.getItems().iterator(); - nextPageToken = results.getNextPageToken(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - @Override - public boolean hasNext() - { - return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext(); - } - - @Override - public StorageObject next() - { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - while (storageObjectsIterator.hasNext()) { - final StorageObject next = storageObjectsIterator.next(); - // list with prefix can return directories, but they should always end with `/`, ignore them - if (!next.getName().endsWith("/")) { - return next; - } - } - - if (nextPageToken != null) { - fetchNextBatch(); - } else if (uris.hasNext()) { - prepareNextRequest(); - fetchNextBatch(); - } - - if (!storageObjectsIterator.hasNext()) { - throw new ISE( - "Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]", - currentBucket, - currentPrefix, - nextPageToken - ); - } - - return next(); - } - }; + return new ObjectStorageIterator(storage, uris, maxListingLength); } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java new file mode 100644 index 000000000000..10275112f6f4 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ObjectStorageIterator implements Iterator +{ + private final GoogleStorage storage; + private final Iterator uris; + private final long maxListingLength; + + private Storage.Objects.List listRequest; + private Objects results; + private URI currentUri; + private String nextPageToken; + private Iterator storageObjectsIterator; + private StorageObject currentObject; + + public ObjectStorageIterator(GoogleStorage storage, Iterator uris, long maxListingLength) + { + this.storage = storage; + this.uris = uris; + this.maxListingLength = maxListingLength; + this.nextPageToken = null; + + prepareNextRequest(); + fetchNextBatch(); + advanceStorageObject(); + } + + private void prepareNextRequest() + { + try { + currentUri = uris.next(); + String currentBucket = currentUri.getAuthority(); + String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); + nextPageToken = null; + listRequest = storage.list(currentBucket) + .setPrefix(currentPrefix) + .setMaxResults(maxListingLength); + + } + catch (IOException io) { + throw new RuntimeException(io); + } + } + + private void fetchNextBatch() + { + try { + listRequest.setPageToken(nextPageToken); + results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); + storageObjectsIterator = results.getItems().iterator(); + nextPageToken = results.getNextPageToken(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public boolean hasNext() + { + return currentObject != null; + } + + @Override + public StorageObject next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final StorageObject retVal = currentObject; + advanceStorageObject(); + return retVal; + } + + private void advanceStorageObject() + { + while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) { + while (storageObjectsIterator.hasNext()) { + final StorageObject next = storageObjectsIterator.next(); + // list with prefix can return directories, but they should always end with `/`, ignore them. + // also skips empty objects. + if (!next.getName().endsWith("/") && next.getSize().signum() > 0) { + currentObject = next; + return; + } + } + + if (nextPageToken != null) { + fetchNextBatch(); + } else if (uris.hasNext()) { + prepareNextRequest(); + fetchNextBatch(); + } + } + + // Truly nothing left to read. + currentObject = null; + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java new file mode 100644 index 000000000000..a1459e2e6e57 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigInteger; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class ObjectStorageIteratorTest +{ + private static final ImmutableList TEST_OBJECTS = + ImmutableList.of( + makeStorageObject("b", "foo", 10L), + makeStorageObject("b", "foo/", 0L), // directory + makeStorageObject("b", "foo/bar1", 10L), + makeStorageObject("b", "foo/bar2", 10L), + makeStorageObject("b", "foo/bar3", 10L), + makeStorageObject("b", "foo/bar4", 10L), + makeStorageObject("b", "foo/bar5", 0L), // empty object + makeStorageObject("b", "foo/baz", 10L), + makeStorageObject("bucketnotmine", "a/different/bucket", 10L) + ); + + @Test + public void testSingleObject() + { + test( + ImmutableList.of("gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/baz"), + 5 + ); + } + + @Test + public void testMultiObjectOneKeyAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 1 + ); + } + + @Test + public void testMultiObjectTwoKeysAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 2 + ); + } + + @Test + public void testMultiObjectTenKeysAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 10 + ); + } + + @Test + public void testPrefixInMiddleOfKey() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4"), + ImmutableList.of("gs://b/foo/bar"), + 10 + ); + } + + @Test + public void testNoPath() + { + test( + ImmutableList.of( + "gs://b/foo", + "gs://b/foo/bar1", + "gs://b/foo/bar2", + "gs://b/foo/bar3", + "gs://b/foo/bar4", + "gs://b/foo/baz" + ), + ImmutableList.of("gs://b"), + 10 + ); + } + + @Test + public void testSlashPath() + { + test( + ImmutableList.of( + "gs://b/foo", + "gs://b/foo/bar1", + "gs://b/foo/bar2", + "gs://b/foo/bar3", + "gs://b/foo/bar4", + "gs://b/foo/baz" + ), + ImmutableList.of("gs://b/"), + 10 + ); + } + + @Test + public void testDifferentBucket() + { + test( + ImmutableList.of(), + ImmutableList.of("gs://bx/foo/"), + 10 + ); + } + + private static void test( + final List expectedUris, + final List prefixes, + final int maxListingLength + ) + { + final List expectedObjects = new ArrayList<>(); + + // O(N^2) but who cares -- the list is short. + for (final String uri : expectedUris) { + final List matches = TEST_OBJECTS + .stream() + .filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri)) + .collect(Collectors.toList()); + + expectedObjects.add(Iterables.getOnlyElement(matches)); + } + + final List actualObjects = ImmutableList.copyOf( + GoogleUtils.lazyFetchingStorageObjectsIterator( + makeMockClient(TEST_OBJECTS), + prefixes.stream().map(URI::create).iterator(), + maxListingLength + ) + ); + + Assert.assertEquals( + prefixes.toString(), + expectedObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()), + actualObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()) + ); + } + + /** + * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the + * {@link ObjectStorageIterator} class. + */ + private static GoogleStorage makeMockClient(final List storageObjects) + { + return new GoogleStorage(null) + { + @Override + public Storage.Objects.List list(final String bucket) + { + return mockList(bucket, storageObjects); + } + }; + } + + @SuppressWarnings("UnnecessaryFullyQualifiedName") + static class MockStorage extends Storage + { + private MockStorage() + { + super( + EasyMock.niceMock(HttpTransport.class), + EasyMock.niceMock(JsonFactory.class), + EasyMock.niceMock(HttpRequestInitializer.class) + ); + } + + private MockList mockList(String bucket, java.util.List storageObjects) + { + return new MockObjects().mockList(bucket, storageObjects); + } + + class MockObjects extends Storage.Objects + { + private MockList mockList(String bucket, java.util.List storageObjects) + { + return new MockList(bucket, storageObjects); + } + + class MockList extends Objects.List + { + private final java.util.List storageObjects; + + private MockList(String bucket, java.util.List storageObjects) + { + super(bucket); + this.storageObjects = storageObjects; + } + + @Override + public com.google.api.services.storage.model.Objects execute() + { + // Continuation token is an index in the "objects" list. + final String continuationToken = getPageToken(); + final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken); + + // Find matching objects. + java.util.List objects = new ArrayList<>(); + int nextIndex = -1; + + for (int i = startIndex; i < storageObjects.size(); i++) { + final StorageObject storageObject = storageObjects.get(i); + + if (storageObject.getBucket().equals(getBucket()) + && storageObject.getName().startsWith(getPrefix())) { + + if (objects.size() == getMaxResults()) { + // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true). + nextIndex = i; + break; + } + + // Generate a summary. + objects.add(storageObject); + } + } + + com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects(); + retVal.setItems(objects); + if (nextIndex >= 0) { + retVal.setNextPageToken(String.valueOf(nextIndex)); + } else { + retVal.setNextPageToken(null); + } + return retVal; + } + } + } + } + + private static MockList mockList(String bucket, List storageObjects) + { + return new MockStorage().mockList(bucket, storageObjects); + } + + private static StorageObject makeStorageObject(final String bucket, final String key, final long size) + { + final StorageObject summary = new StorageObject(); + summary.setBucket(bucket); + summary.setName(key); + summary.setSize(BigInteger.valueOf(size)); + return summary; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 661a07894e21..be7f3c883401 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -116,6 +116,7 @@ public static Collection getPaths(List inputPaths, Configuration c return new HdfsFileInputFormat().getSplits(job) .stream() + .filter(split -> ((FileSplit) split).getLength() > 0) .map(split -> ((FileSplit) split).getPath()) .collect(Collectors.toSet()); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java index 0e791cd40e57..a515c153b0e6 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java @@ -125,8 +125,8 @@ private void advanceObjectSummary() while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) { while (objectSummaryIterator.hasNext()) { currentObjectSummary = objectSummaryIterator.next(); - - if (!isDirectoryPlaceholder(currentObjectSummary)) { + // skips directories and empty objects + if (!isDirectoryPlaceholder(currentObjectSummary) && currentObjectSummary.getSize() > 0) { return; } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 59b6303eb28f..122f4fd6a56b 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -374,8 +374,8 @@ public void testWithUrisSplit() public void testWithPrefixesSplit() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); EasyMock.replay(S3_CLIENT); S3InputSource inputSource = new S3InputSource( @@ -401,8 +401,8 @@ public void testWithPrefixesSplit() public void testCreateSplitsWithSplitHintSpecRespectingHint() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); EasyMock.replay(S3_CLIENT); S3InputSource inputSource = new S3InputSource( @@ -412,7 +412,8 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() null, PREFIXES, null, - null); + null + ); Stream>> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -426,11 +427,40 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() EasyMock.verify(S3_CLIENT); } + @Test + public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() + { + EasyMock.reset(S3_CLIENT); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), new byte[0]); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource( + SERVICE, + SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER, + INPUT_DATA_CONFIG, + null, + PREFIXES, + null, + null + ); + + Stream>> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + Assert.assertEquals( + ImmutableList.of(ImmutableList.of(new CloudObjectLocation(EXPECTED_URIS.get(0)))), + splits.map(InputSplit::get).collect(Collectors.toList()) + ); + EasyMock.verify(S3_CLIENT); + } + @Test public void testAccessDeniedWhileListingPrefix() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -459,8 +489,8 @@ public void testAccessDeniedWhileListingPrefix() public void testReader() throws IOException { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); expectGetObject(EXPECTED_URIS.get(0)); expectGetObject(EXPECTED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -503,8 +533,8 @@ public void testReader() throws IOException public void testCompressedReader() throws IOException { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); - expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)), CONTENT); + expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)), CONTENT); expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0)); expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -543,7 +573,7 @@ public void testCompressedReader() throws IOException EasyMock.verify(S3_CLIENT); } - private static void expectListObjects(URI prefix, List uris) + private static void expectListObjects(URI prefix, List uris, byte[] content) { final ListObjectsV2Result result = new ListObjectsV2Result(); result.setBucketName(prefix.getAuthority()); @@ -554,7 +584,7 @@ private static void expectListObjects(URI prefix, List uris) final S3ObjectSummary objectSummary = new S3ObjectSummary(); objectSummary.setBucketName(bucket); objectSummary.setKey(key); - objectSummary.setSize(CONTENT.length); + objectSummary.setSize(content.length); result.getObjectSummaries().add(objectSummary); } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java index d63409cee194..f1be5c7f3bde 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java @@ -37,11 +37,12 @@ public class ObjectSummaryIteratorTest private static final ImmutableList TEST_OBJECTS = ImmutableList.of( makeObjectSummary("b", "foo", 10L), - makeObjectSummary("b", "foo/", 0L), + makeObjectSummary("b", "foo/", 0L), // directory makeObjectSummary("b", "foo/bar1", 10L), makeObjectSummary("b", "foo/bar2", 10L), makeObjectSummary("b", "foo/bar3", 10L), makeObjectSummary("b", "foo/bar4", 10L), + makeObjectSummary("b", "foo/bar5", 0L), // empty object makeObjectSummary("b", "foo/baz", 10L), makeObjectSummary("bucketnotmine", "a/different/bucket", 10L) ); From 22915080f6b7399531ca63f9dd41f6a64d35dc4a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Mar 2020 20:22:21 -0800 Subject: [PATCH 02/10] split hint spec doc --- docs/ingestion/native-batch.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 39e838a95605..65667be66775 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -74,7 +74,7 @@ You may want to consider the below things: - You may want to control the amount of input data each worker task processes. This can be controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details). - For the tasks that read data from the `inputSource`, you can set the [SplitHintSpec](#splithintspec) in the `tuningConfig`. + For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`. For the tasks that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`. - The number of concurrent worker tasks in parallel ingestion is determined by `maxNumConcurrentSubTasks` in the `tuningConfig`. The supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks` @@ -202,7 +202,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| -|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [SplitHintSpec](#splithintspec) for more details.|null|`maxSize`| +|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no| |partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no| |indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no| |indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no| @@ -219,23 +219,23 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no| -### `splitHintSpec` +### Split Hint Spec -`SplitHintSpec` is used to give a hint when the supervisor task creates input splits. +The split hint spec is used to give a hint when the supervisor task creates input splits. Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase. -#### `MaxSizeSplitHintSpec` +#### Size-based Split Hint Spec -`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source. +The size-based split hint spec is respected by all splittable input sources except for the HTTP input source. |property|description|default|required?| |--------|-----------|-------|---------| |type|This should always be `maxSize`.|none|yes| |maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (Files are never split across tasks yet).|500MB|no| -#### `SegmentsSplitHintSpec` +#### Segments Split Hint Spec -`SegmentsSplitHintSpec` is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)). +The segments split hint spec is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)). |property|description|default|required?| |--------|-----------|-------|---------| @@ -294,7 +294,7 @@ How the worker task creates segments is: The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce). The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`. - In the `partial segment generation` phase, just like the Map phase in MapReduce, -the Parallel task splits the input data based on `splitHintSpec` +the Parallel task splits the input data based on the split hint spec and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split, and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec` and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`. @@ -326,7 +326,7 @@ The first phase is to collect some statistics to find the best partitioning and the other 2 phases are to create partial segments and to merge them, respectively, as in hash-based partitioning. - In the `partial dimension distribution` phase, the Parallel task splits the input data and -assigns them to worker tasks based on `splitHintSpec`. Each worker task (type `partial_dimension_distribution`) reads +assigns them to worker tasks based on the split hint spec. Each worker task (type `partial_dimension_distribution`) reads the assigned split and builds a histogram for `partitionDimension`. The Parallel task collects those histograms from worker tasks and finds the best range partitioning based on `partitionDimension` to evenly @@ -1529,7 +1529,7 @@ This firehose will accept any type of parser, but will only utilize the list of |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.md)|no| -|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| +|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| From 77524e22f907c6554c0464fe0eb0a8114753ddca Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Mar 2020 20:27:41 -0800 Subject: [PATCH 03/10] doc for skipping empty files --- docs/ingestion/native-batch.md | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 65667be66775..a3e7d1c80ae5 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -839,10 +839,12 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `s3`.|None|yes| |uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| |properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given) +Note that the S3 input source will skip all empty objects only when `prefixes` is specified. + S3 Object: |property|description|default|required?| @@ -927,9 +929,11 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `google`.|None|yes| |uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified. + Google Cloud Storage object: |property|description|default|required?| @@ -1004,9 +1008,11 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `google`.|None|yes| |uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\/\". Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +Note that the Azure input source will skip all empty objects only when `prefixes` is specified. + Azure Blob object: |property|description|default|required?| @@ -1092,7 +1098,7 @@ Sample specs: |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `hdfs`.|None|yes| -|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|None|yes| +|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| You can also ingest from cloud storage using the HDFS input source. However, if you want to read from AWS S3 or Google Cloud Storage, consider using @@ -1233,8 +1239,8 @@ Sample spec: |--------|-----------|---------| |type|This should be "local".|yes| |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes if `baseDir` is specified| -|baseDir|Directory to search recursively for files to be ingested. |At least one of `baseDir` or `files` should be specified| -|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. |At least one of `baseDir` or `files` should be specified| +|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified| +|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified| ### Druid Input Source From 902c8c0c1c378b3100bc637f56b342c55db289a5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Mar 2020 20:33:14 -0800 Subject: [PATCH 04/10] fix typo; adjust tests --- .../main/java/org/apache/druid/data/input/InputEntity.java | 4 ++-- .../druid/storage/google/ObjectStorageIteratorTest.java | 3 ++- .../apache/druid/storage/s3/ObjectSummaryIteratorTest.java | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index d900110b3bb1..71f14d95a3c3 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -35,8 +35,8 @@ /** * InputEntity abstracts an input entity and knows how to read bytes from the given entity. - * Implementation assume that the given input entity is not empty, the InputSources creating InputEntitires should - * filter out empty entities properly. + * Since the implementations of this interface assume that the given entity is not empty, the InputSources + * should not create InputEntities for empty entities. */ @UnstableApi public interface InputEntity diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index a1459e2e6e57..b14770380ce3 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -49,7 +49,8 @@ public class ObjectStorageIteratorTest makeStorageObject("b", "foo/bar4", 10L), makeStorageObject("b", "foo/bar5", 0L), // empty object makeStorageObject("b", "foo/baz", 10L), - makeStorageObject("bucketnotmine", "a/different/bucket", 10L) + makeStorageObject("bucketnotmine", "a/different/bucket", 10L), + makeStorageObject("b", "foo/bar/", 0L) // another directory at the end of list ); @Test diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java index f1be5c7f3bde..19722e2056d9 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java @@ -44,7 +44,8 @@ public class ObjectSummaryIteratorTest makeObjectSummary("b", "foo/bar4", 10L), makeObjectSummary("b", "foo/bar5", 0L), // empty object makeObjectSummary("b", "foo/baz", 10L), - makeObjectSummary("bucketnotmine", "a/different/bucket", 10L) + makeObjectSummary("bucketnotmine", "a/different/bucket", 10L), + makeObjectSummary("b", "foo/bar/", 0L) // another directory at the end of list ); @Test From 02ffb9ad072b6b0732d5c7b52e00834560400673 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Mar 2020 20:36:14 -0800 Subject: [PATCH 05/10] unnecessary fluent iterable --- .../apache/druid/data/input/MaxSizeSplitHintSpec.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java index e5e553112068..b023d5c3fd02 100644 --- a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterators; import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; @@ -64,10 +64,10 @@ public long getMaxSplitSize() @Override public Iterator> split(Iterator inputIterator, Function inputAttributeExtractor) { - final Iterator nonEmptyFileOnlyIterator = FluentIterable - .from(() -> inputIterator) - .filter(input -> inputAttributeExtractor.apply(input).getSize() > 0) - .iterator(); + final Iterator nonEmptyFileOnlyIterator = Iterators.filter( + inputIterator, + input -> inputAttributeExtractor.apply(input).getSize() > 0 + ); return new Iterator>() { private T peeking; From b59e457ee8f67789c16b7e43ad34d8acfd51f4f9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Mar 2020 11:06:46 -0800 Subject: [PATCH 06/10] address comments --- .../apache/druid/data/input/MaxSizeSplitHintSpec.java | 2 -- .../druid/data/input/impl/LocalInputSourceTest.java | 2 +- .../storage/google/ObjectStorageIteratorTest.java | 10 ++++++++++ .../druid/storage/s3/ObjectSummaryIteratorTest.java | 10 ++++++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java index b023d5c3fd02..4b834c14c6c5 100644 --- a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; -import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; import java.util.ArrayList; @@ -42,7 +41,6 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec { public static final String TYPE = "maxSize"; - private static final Logger LOG = new Logger(MaxSizeSplitHintSpec.class); @VisibleForTesting static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024; diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index d818ff6ded1c..3c357789e742 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -20,8 +20,8 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import nl.jqno.equalsverifier.EqualsVerifier; -import org.apache.commons.compress.utils.Lists; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index b14770380ce3..4d1504f842c3 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -147,6 +147,16 @@ public void testDifferentBucket() ); } + @Test + public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/bar", "gs://b/foo/baz"), + 10 + ); + } + private static void test( final List expectedUris, final List prefixes, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java index 19722e2056d9..ea2ca4af26c1 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java @@ -142,6 +142,16 @@ public void testDifferentBucket() ); } + @Test + public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes() + { + test( + ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"), + ImmutableList.of("s3://b/foo/bar", "s3://b/foo/baz"), + 10 + ); + } + private static void test( final List expectedUris, final List prefixes, From c6c1a7f24b25ae80f10fbff44b43bb033027b478 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Mar 2020 11:11:05 -0800 Subject: [PATCH 07/10] fix test --- .../data/input/impl/LocalInputSourceTest.java | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index 3c357789e742..e5786aff0f35 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -33,6 +33,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -67,7 +68,7 @@ public void testCreateSplitsRespectingSplitHintSpec() { final long fileSize = 15; final long maxSplitSize = 50; - final Set files = prepareFiles(10, fileSize); + final Set files = mockFiles(10, fileSize); final LocalInputSource inputSource = new LocalInputSource(null, null, files); final List>> splits = inputSource .createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize)) @@ -84,7 +85,7 @@ public void testEstimateNumSplitsRespectingSplitHintSpec() { final long fileSize = 13; final long maxSplitSize = 40; - final Set files = prepareFiles(10, fileSize); + final Set files = mockFiles(10, fileSize); final LocalInputSource inputSource = new LocalInputSource(null, null, files); Assert.assertEquals( 4, @@ -98,11 +99,19 @@ public void testGetFileIteratorWithBothBaseDirAndDuplicateFilesIteratingFilesOnl File baseDir = temporaryFolder.newFolder(); List filesInBaseDir = new ArrayList<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (FileWriter writer = new FileWriter(file)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Set files = new HashSet<>(filesInBaseDir.subList(0, 5)); for (int i = 0; i < 3; i++) { - files.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (FileWriter writer = new FileWriter(file)) { + writer.write("test"); + } + files.add(file); } Set expectedFiles = new HashSet<>(filesInBaseDir); expectedFiles.addAll(files); @@ -118,7 +127,11 @@ public void testGetFileIteratorWithOnlyBaseDirIteratingAllFiles() throws IOExcep File baseDir = temporaryFolder.newFolder(); Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (FileWriter writer = new FileWriter(file)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Iterator fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); @@ -131,7 +144,11 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti File baseDir = temporaryFolder.newFolder(); Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (FileWriter writer = new FileWriter(file)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Iterator fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); @@ -141,14 +158,14 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti @Test public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly() { - final Set files = new HashSet<>(prepareFiles(10, 5)); - files.addAll(prepareFiles(10, 0)); + final Set files = new HashSet<>(mockFiles(10, 5)); + files.addAll(mockFiles(10, 0)); final LocalInputSource inputSource = new LocalInputSource(null, null, files); List iteratedFiles = Lists.newArrayList(inputSource.getFileIterator()); Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0)); } - private static Set prepareFiles(int numFiles, long fileSize) + private static Set mockFiles(int numFiles, long fileSize) { final Set files = new HashSet<>(); for (int i = 0; i < numFiles; i++) { From 0c31efc039db8336834a390ef8f41a153635e0eb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Mar 2020 11:11:50 -0800 Subject: [PATCH 08/10] use the right lists --- .../org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java index 2ad40afa579f..5ec57c0b83e1 100644 --- a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java +++ b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java @@ -20,8 +20,8 @@ package org.apache.druid.data.input; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import nl.jqno.equalsverifier.EqualsVerifier; -import org.apache.commons.compress.utils.Lists; import org.junit.Assert; import org.junit.Test; From 0e575b33775a0cc02e76d901e247ff6746c36dfc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Mar 2020 13:40:57 -0800 Subject: [PATCH 09/10] fix test --- .../druid/data/input/impl/LocalInputSourceTest.java | 12 +++++++----- .../storage/azure/AzureCloudBlobIteratorTest.java | 2 +- .../s3/S3TimestampVersionedDataFinderTest.java | 4 ++++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index e5786aff0f35..97814355425b 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -33,8 +33,10 @@ import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -100,7 +102,7 @@ public void testGetFileIteratorWithBothBaseDirAndDuplicateFilesIteratingFilesOnl List filesInBaseDir = new ArrayList<>(); for (int i = 0; i < 10; i++) { final File file = File.createTempFile("local-input-source", ".data", baseDir); - try (FileWriter writer = new FileWriter(file)) { + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { writer.write("test"); } filesInBaseDir.add(file); @@ -108,7 +110,7 @@ public void testGetFileIteratorWithBothBaseDirAndDuplicateFilesIteratingFilesOnl Set files = new HashSet<>(filesInBaseDir.subList(0, 5)); for (int i = 0; i < 3; i++) { final File file = File.createTempFile("local-input-source", ".data", baseDir); - try (FileWriter writer = new FileWriter(file)) { + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { writer.write("test"); } files.add(file); @@ -128,7 +130,7 @@ public void testGetFileIteratorWithOnlyBaseDirIteratingAllFiles() throws IOExcep Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { final File file = File.createTempFile("local-input-source", ".data", baseDir); - try (FileWriter writer = new FileWriter(file)) { + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { writer.write("test"); } filesInBaseDir.add(file); @@ -145,7 +147,7 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { final File file = File.createTempFile("local-input-source", ".data", baseDir); - try (FileWriter writer = new FileWriter(file)) { + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { writer.write("test"); } filesInBaseDir.add(file); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java index dbccf0446eb3..22bbdba158b1 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java @@ -19,8 +19,8 @@ package org.apache.druid.storage.azure; -import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.microsoft.azure.storage.ResultContinuation; import com.microsoft.azure.storage.ResultSegment; import com.microsoft.azure.storage.StorageException; diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java index 8e144c2340f2..f839d3df4c7f 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java @@ -46,10 +46,12 @@ public void testSimpleLatestVersion() object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); object1.setBucketName(bucket); object1.setKey(keyPrefix + "/renames-1.gz"); object1.setLastModified(new Date(1)); + object1.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0); @@ -116,6 +118,7 @@ public void testFindSelf() object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0); @@ -153,6 +156,7 @@ public void testFindExact() object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0); From 6155d9c36d1b3aa125c61bb22721676fa49e5de2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Mar 2020 16:57:25 -0800 Subject: [PATCH 10/10] fix test --- .../org/apache/druid/data/input/azure/AzureInputSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index 35d1e5ff58f9..8a7f16cf5be7 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -148,7 +148,7 @@ public void test_getPrefixesSplitStream_successfullyCreatesCloudLocation_returns EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator); EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1)) .andReturn(CLOUD_OBJECT_LOCATION_1); - EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L); + EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes(); replayAll(); azureInputSource = new AzureInputSource(