From 0b0e0d9984e860457126c5e2db7fd86db7249634 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 22 Nov 2019 02:53:47 -0800 Subject: [PATCH 01/10] add prefixes support to google input source, making it symmetrical-ish with s3 --- .../data/input/impl/CloudObjectLocation.java | 93 ++++++++++++ .../google/GoogleCloudStorageEntity.java | 17 +-- .../google/GoogleCloudStorageInputSource.java | 134 ++++++++++++++++-- .../storage/google/GoogleByteSource.java | 31 ++++ .../druid/storage/google/GoogleUtils.java | 108 +++++++++++++- .../GoogleCloudStorageInputSourceTest.java | 63 ++++++-- 6 files changed, 404 insertions(+), 42 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java new file mode 100644 index 000000000000..42f44abaedb1 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.net.URI; +import java.util.Objects; + +public class CloudObjectLocation +{ + private final String bucket; + private final String path; + + @JsonCreator + public CloudObjectLocation(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path) + { + this.bucket = Preconditions.checkNotNull(bucket); + this.path = Preconditions.checkNotNull(path); + } + + public CloudObjectLocation(URI uri) + { + bucket = uri.getHost(); + String path = uri.getPath(); + if (path.startsWith("/")) { + path = path.substring(1); + } + this.path = path; + } + + @JsonProperty + public String getBucket() + { + return bucket; + } + + @JsonProperty + public String getPath() + { + return path; + } + + @Override + public String toString() + { + return "CloudObjectLocation {" + + "bucket=" + bucket + + ",path=" + path + + "}"; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CloudObjectLocation that = (CloudObjectLocation) o; + return Objects.equals(bucket, that.bucket) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, path); + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 6144857d3021..4994ff3e4c7e 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -22,7 +22,6 @@ import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.storage.google.GoogleByteSource; -import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.utils.CompressionUtils; @@ -33,30 +32,24 @@ public class GoogleCloudStorageEntity implements RetryingInputEntity { - private final GoogleStorage storage; - private final URI uri; + private final GoogleByteSource byteSource; - GoogleCloudStorageEntity(GoogleStorage storage, URI uri) + GoogleCloudStorageEntity(GoogleByteSource byteSource) { - this.storage = storage; - this.uri = uri; + this.byteSource = byteSource; } @Nullable @Override public URI getUri() { - return uri; + return null; } @Override public InputStream readFrom(long offset) throws IOException { - // Get data of the given object and open an input stream - final String bucket = uri.getAuthority(); - final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); - final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); - return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath()); + return CompressionUtils.decompress(byteSource.openStream(offset), byteSource.getPath()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 24cc882a868c..18793051a2ff 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputFormat; @@ -29,30 +31,83 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; import java.io.File; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; -public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource +public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource { private final GoogleStorage storage; private final List uris; + private final List prefixes; + private final List byteSources; @JsonCreator public GoogleCloudStorageInputSource( @JacksonInject GoogleStorage storage, - @JsonProperty("uris") List uris + @JsonProperty("uris") @Nullable List uris, + @JsonProperty("prefixes") @Nullable List prefixes, + @JsonProperty("objects") @Nullable List objects ) { this.storage = storage; - this.uris = uris; + this.uris = uris == null ? new ArrayList<>() : uris; + this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; + + if (objects != null) { + this.byteSources = objects.stream() + .map(x -> new GoogleByteSource(storage, x.getBucket(), x.getPath())) + .collect(Collectors.toList()); + if (!this.uris.isEmpty()) { + throw new IAE("uris cannot be used with object"); + } + if (!this.prefixes.isEmpty()) { + throw new IAE("prefixes cannot be used with object"); + } + } else { + this.byteSources = null; + if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) { + throw new IAE("uris and prefixes cannot be used together"); + } + + if (this.uris.isEmpty() && this.prefixes.isEmpty()) { + throw new IAE("uris or prefixes must be specified"); + } + + for (final URI inputURI : this.uris) { + Preconditions.checkArgument("gs".equals(inputURI.getScheme()), "input uri scheme == gs (%s)", inputURI); + } + + for (final URI inputURI : this.prefixes) { + Preconditions.checkArgument("gs".equals(inputURI.getScheme()), "input uri scheme == gs (%s)", inputURI); + } + } + } + + private GoogleCloudStorageInputSource( + GoogleStorage storage, + GoogleByteSource byteSource + ) + { + this.storage = storage; + this.byteSources = ImmutableList.of(byteSource); + this.uris = new ArrayList<>(); + this.prefixes = new ArrayList<>(); } @JsonProperty("uris") @@ -61,23 +116,57 @@ public List getUris() return uris; } + @JsonProperty("prefixes") + public List getPrefixes() + { + return prefixes; + } + + @Nullable + @JsonProperty("objects") + public List getObjects() + { + if (byteSources != null) { + return byteSources.stream() + .map(x -> new CloudObjectLocation(x.getBucket(), x.getPath())) + .collect(Collectors.toList()); + } + return null; + } @Override - public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return uris.stream().map(InputSplit::new); + if (byteSources != null) { + return byteSources.stream().map(InputSplit::new); + } + if (!uris.isEmpty()) { + return uris.stream().map(this::byteSourceFromUri).map(InputSplit::new); + } + + return StreamSupport.stream(storageObjectIterable().spliterator(), false) + .map(this::byteSourceFromStorageObject) + .map(InputSplit::new); } @Override public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return uris.size(); + if (byteSources != null) { + return 1; + } + + if (!uris.isEmpty()) { + return uris.size(); + } + + return (int) StreamSupport.stream(storageObjectIterable().spliterator(), false).count(); } @Override - public SplittableInputSource withSplit(InputSplit split) + public SplittableInputSource withSplit(InputSplit split) { - return new GoogleCloudStorageInputSource(storage, ImmutableList.of(split.get())); + return new GoogleCloudStorageInputSource(storage, split.get()); } @Override @@ -96,14 +185,29 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity( - storage, - split.get() - )), + createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(split.get())), temporaryDirectory ); } + private GoogleByteSource byteSourceFromUri(final URI uri) + { + final String bucket = uri.getAuthority(); + final String path = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); + return new GoogleByteSource(storage, bucket, path); + } + + private GoogleByteSource byteSourceFromStorageObject(final StorageObject storageObject) + { + return new GoogleByteSource(storage, storageObject.getBucket(), storageObject.getName()); + } + + private Iterable storageObjectIterable() + { + return () -> + GoogleUtils.lazyFetchingStorageObjectsIterator(storage, prefixes.iterator(), RetryUtils.DEFAULT_MAX_TRIES); + } + @Override public boolean equals(Object o) { @@ -114,12 +218,14 @@ public boolean equals(Object o) return false; } GoogleCloudStorageInputSource that = (GoogleCloudStorageInputSource) o; - return Objects.equals(uris, that.uris); + return Objects.equals(uris, that.uris) && + Objects.equals(prefixes, that.prefixes) && + Objects.equals(byteSources, that.byteSources); } @Override public int hashCode() { - return Objects.hash(uris); + return Objects.hash(uris, prefixes); } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java index 6066eb2c13e7..977353f9f204 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Objects; public class GoogleByteSource extends ByteSource { @@ -37,6 +38,16 @@ public GoogleByteSource(final GoogleStorage storage, final String bucket, final this.path = path; } + public String getBucket() + { + return bucket; + } + + public String getPath() + { + return path; + } + @Override public InputStream openStream() throws IOException { @@ -47,4 +58,24 @@ public InputStream openStream(long start) throws IOException { return storage.get(bucket, path, start); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleByteSource that = (GoogleByteSource) o; + return Objects.equals(bucket, that.bucket) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, path); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 1dd12ef2388e..ac591179c0d4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,13 +20,22 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import java.io.IOException; import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; public class GoogleUtils { + public static final Predicate GOOGLE_RETRY = GoogleUtils::isRetryable; + public static boolean isRetryable(Throwable t) { if (t instanceof HttpResponseException) { @@ -36,10 +45,107 @@ public static boolean isRetryable(Throwable t) return t instanceof IOException; } + private static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + { + return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); + } + public static String extractGoogleCloudStorageObjectKey(URI uri) { return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); } - public static final Predicate GOOGLE_RETRY = e -> isRetryable(e); + public static Iterator lazyFetchingStorageObjectsIterator( + final GoogleStorage storage, + final Iterator uris, + final long maxListingLength + ) + { + return new Iterator() + { + private Storage.Objects.List listRequest; + private Objects results; + private URI currentUri; + private String currentBucket; + private String currentPrefix; + private String nextPageToken; + private Iterator storageObjectsIterator; + + { + nextPageToken = null; + prepareNextRequest(); + fetchNextBatch(); + } + + private void prepareNextRequest() + { + try { + currentUri = uris.next(); + currentBucket = currentUri.getAuthority(); + currentPrefix = GoogleUtils.extractGoogleCloudStorageObjectKey(currentUri); + nextPageToken = null; + listRequest = storage.list(currentBucket) + .setPrefix(currentPrefix) + .setMaxResults(maxListingLength); + + } + catch (IOException io) { + throw new RuntimeException(io); + } + } + + private void fetchNextBatch() + { + try { + listRequest.setPageToken(nextPageToken); + results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); + storageObjectsIterator = results.getItems().iterator(); + nextPageToken = results.getNextPageToken(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public boolean hasNext() + { + return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext(); + } + + @Override + public StorageObject next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + while (storageObjectsIterator.hasNext()) { + final StorageObject next = storageObjectsIterator.next(); + // list with prefix can return directories, but they should always end with `/`, ignore them + if (!next.getName().endsWith("/")) { + return next; + } + } + + if (nextPageToken != null) { + fetchNextBatch(); + } else if (uris.hasNext()) { + prepareNextRequest(); + fetchNextBatch(); + } + + if (!storageObjectsIterator.hasNext()) { + throw new ISE( + "Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]", + currentBucket, + currentPrefix, + nextPageToken + ); + } + + return next(); + } + }; + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index a3b77afd890b..cd7574f36183 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -29,10 +29,12 @@ import com.google.inject.Injector; import com.google.inject.Provides; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.initialization.DruidModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.junit.Assert; import org.junit.Test; @@ -47,27 +49,58 @@ public class GoogleCloudStorageInputSourceTest { private static final GoogleStorage STORAGE = new GoogleStorage(null); + private static final List URIS = Arrays.asList( + URI.create("gs://foo/bar/file.gz"), + URI.create("gs://bar/foo/file2.gz") + ); + + private static final List PREFIXES = Arrays.asList( + URI.create("gs://foo/bar"), + URI.create("gs://bar/foo") + ); + + private static final List BYTE_SOURCES = Arrays.asList( + new GoogleByteSource(STORAGE, "foo", "bar/file.gz"), + new GoogleByteSource(STORAGE, "bar", "foo/file2.gz") + ); + @Test public void testSerde() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); - - final List uris = Arrays.asList( - new URI("gs://foo/bar/file.gz"), - new URI("gs://bar/foo/file2.gz") - ); - - final List prefixes = Arrays.asList( - new URI("gs://foo/bar"), - new URI("gs://bar/foo") - ); - - final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, uris); + final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, URIS, ImmutableList.of(), null); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); } + @Test + public void testSerdePrefixes() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + final GoogleCloudStorageInputSource withPrefixes = + new GoogleCloudStorageInputSource(STORAGE, ImmutableList.of(), PREFIXES, null); + final GoogleCloudStorageInputSource serdeWithPrefixes = + mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); + Assert.assertEquals(withPrefixes, serdeWithPrefixes); + } + + @Test + public void testSerdeObjects() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + final GoogleCloudStorageInputSource withObjects = + new GoogleCloudStorageInputSource( + STORAGE, + null, + null, + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")) + ); + final GoogleCloudStorageInputSource serdeWithObjects = + mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class); + Assert.assertEquals(withObjects, serdeWithObjects); + } + @Test public void testWithUrisSplit() { @@ -76,13 +109,13 @@ public void testWithUrisSplit() URI.create("gs://bar/foo/file2.gz") ); - GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris); + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris, ImmutableList.of(), null); - Stream> splits = inputSource.createSplits( + Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), null ); - Assert.assertEquals(uris, splits.map(InputSplit::get).collect(Collectors.toList())); + Assert.assertEquals(BYTE_SOURCES, splits.map(InputSplit::get).collect(Collectors.toList())); } public static ObjectMapper createGoogleObjectMapper() From 62b814aced18d2d19d7e763aaf16b1cdfde091b5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 22 Nov 2019 02:55:53 -0800 Subject: [PATCH 02/10] docs --- docs/development/extensions-core/google.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 6dafa6ad0469..5a64b611674b 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -60,11 +60,27 @@ This extension also provides an input source for Druid native batch ingestion to ... ``` +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "google", + "prefixes": ["gs://foo/bar", "gs://bar/foo"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `google`.|N/A|yes| -|uris|JSON array of URIs where Google Cloud Storage files to be ingested are located.|N/A|yes| - +|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` must be set| +|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` must be set| ## Firehose From fcb81928efd234cc0061948cb9de71e94b4ccfda Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 05:45:39 -0800 Subject: [PATCH 03/10] more better, and tests --- .../input/impl/CloudObjectInputSource.java | 169 +++++++++++++++ .../data/input/impl/CloudObjectLocation.java | 10 +- extensions-core/google-extensions/pom.xml | 13 ++ .../google/GoogleCloudStorageEntity.java | 11 +- .../google/GoogleCloudStorageInputSource.java | 173 ++------------- .../druid/storage/google/GoogleUtils.java | 7 +- .../GoogleCloudStorageInputSourceTest.java | 201 ++++++++++++++++-- .../druid/data/input/s3/S3InputSource.java | 136 ++---------- .../org/apache/druid/storage/s3/S3Utils.java | 5 +- 9 files changed, 415 insertions(+), 310 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java new file mode 100644 index 000000000000..e983d14ffc42 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Ints; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.net.URI; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public abstract class CloudObjectInputSource extends AbstractInputSource implements SplittableInputSource +{ + private final List uris; + private final List prefixes; + private final List objects; + + public CloudObjectInputSource( + String scheme, + @Nullable List uris, + @Nullable List prefixes, + @Nullable List objects + ) + { + this.uris = uris; + this.prefixes = prefixes; + this.objects = objects; + + if (!CollectionUtils.isNullOrEmpty(objects)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes)); + } else if (!CollectionUtils.isNullOrEmpty(uris)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes)); + uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri)); + } else if (!CollectionUtils.isNullOrEmpty(prefixes)) { + prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri)); + } else { + throwIfIllegalArgs(true); + } + } + + @JsonProperty + public List getUris() + { + return uris; + } + + @JsonProperty + public List getPrefixes() + { + return prefixes; + } + + @Nullable + @JsonProperty + public List getObjects() + { + return objects; + } + + protected abstract T createEntity(InputSplit split); + + protected abstract Stream> getPrefixesSplitStream(); + + @Override + public Stream> createSplits( + InputFormat inputFormat, @Nullable + SplitHintSpec splitHintSpec) + { + if (!CollectionUtils.isNullOrEmpty(objects)) { + return objects.stream().map(InputSplit::new); + } + if (!CollectionUtils.isNullOrEmpty(uris)) { + return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new); + } + + return getPrefixesSplitStream(); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + if (!CollectionUtils.isNullOrEmpty(objects)) { + return objects.size(); + } + + if (!CollectionUtils.isNullOrEmpty(uris)) { + return uris.size(); + } + + return Ints.checkedCast(getPrefixesSplitStream().count()); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + protected InputSourceReader formattableReader( + InputRowSchema inputRowSchema, + InputFormat inputFormat, + @Nullable File temporaryDirectory + ) + { + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null).map(this::createEntity), + temporaryDirectory + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CloudObjectInputSource that = (CloudObjectInputSource) o; + return Objects.equals(uris, that.uris) && + Objects.equals(prefixes, that.prefixes) && + Objects.equals(objects, that.objects); + } + + @Override + public int hashCode() + { + return Objects.hash(uris, prefixes, objects); + } + + private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException + { + if (clause) { + throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified"); + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java index d98100722586..e79b04edaf4c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import java.net.URI; @@ -45,6 +46,14 @@ */ public class CloudObjectLocation { + public static URI validateUriScheme(String scheme, URI uri) + { + if (!scheme.equalsIgnoreCase(uri.getScheme())) { + throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), scheme); + } + return uri; + } + private final String bucket; private final String path; @@ -125,5 +134,4 @@ public int hashCode() { return Objects.hash(bucket, path); } - } diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index d98910b27180..935f9f503ae2 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -124,6 +124,13 @@ provided + + org.apache.druid + druid-core + ${project.parent.version} + test + test-jar + org.apache.druid druid-processing @@ -158,5 +165,11 @@ jackson-module-guice test + + com.google.cloud + google-cloud-nio + 0.119.0-alpha + test + diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index c58ca9a136fb..907afe6f8c7c 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -21,8 +21,10 @@ import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; +import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; @@ -34,9 +36,9 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity { private final GoogleByteSource byteSource; - GoogleCloudStorageEntity(GoogleByteSource byteSource) + GoogleCloudStorageEntity(GoogleStorage storage, CloudObjectLocation location) { - this.byteSource = byteSource; + this.byteSource = new GoogleByteSource(storage, location.getBucket(), location.getPath()); } @Nullable @@ -50,16 +52,13 @@ public URI getUri() protected InputStream readFrom(long offset) throws IOException { // Get data of the given object and open an input stream - final String bucket = uri.getAuthority(); - final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); - final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); return byteSource.openStream(offset); } @Override protected String getPath() { - return StringUtils.maybeRemoveLeadingSlash(uri.getPath()); + return StringUtils.maybeRemoveLeadingSlash(byteSource.getPath()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 278168bb7088..a867612523db 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -23,40 +23,26 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.services.storage.model.StorageObject; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.AbstractInputSource; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; -import java.io.File; import java.net.URI; -import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource +public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - private final GoogleStorage storage; - private final List uris; - private final List prefixes; - private final List byteSources; + private static final String SCHEME = "gs"; + private final GoogleStorage storage; @JsonCreator public GoogleCloudStorageInputSource( @JacksonInject GoogleStorage storage, @@ -65,167 +51,48 @@ public GoogleCloudStorageInputSource( @JsonProperty("objects") @Nullable List objects ) { + super(SCHEME, uris, prefixes, objects); this.storage = storage; - this.uris = uris == null ? new ArrayList<>() : uris; - this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; - - if (objects != null) { - this.byteSources = objects.stream() - .map(x -> new GoogleByteSource(storage, x.getBucket(), x.getPath())) - .collect(Collectors.toList()); - if (!this.uris.isEmpty()) { - throw new IAE("uris cannot be used with object"); - } - if (!this.prefixes.isEmpty()) { - throw new IAE("prefixes cannot be used with object"); - } - } else { - this.byteSources = null; - if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) { - throw new IAE("uris and prefixes cannot be used together"); - } - - if (this.uris.isEmpty() && this.prefixes.isEmpty()) { - throw new IAE("uris or prefixes must be specified"); - } - - for (final URI inputURI : this.uris) { - Preconditions.checkArgument("gs".equals(inputURI.getScheme()), "input uri scheme == gs (%s)", inputURI); - } - - for (final URI inputURI : this.prefixes) { - Preconditions.checkArgument("gs".equals(inputURI.getScheme()), "input uri scheme == gs (%s)", inputURI); - } - } - } - - private GoogleCloudStorageInputSource( - GoogleStorage storage, - GoogleByteSource byteSource - ) - { - this.storage = storage; - this.byteSources = ImmutableList.of(byteSource); - this.uris = new ArrayList<>(); - this.prefixes = new ArrayList<>(); } - @JsonProperty("uris") - public List getUris() - { - return uris; - } - - @JsonProperty("prefixes") - public List getPrefixes() - { - return prefixes; - } - - @Nullable - @JsonProperty("objects") - public List getObjects() + @Override + protected GoogleCloudStorageEntity createEntity(InputSplit split) { - if (byteSources != null) { - return byteSources.stream() - .map(x -> new CloudObjectLocation(x.getBucket(), x.getPath())) - .collect(Collectors.toList()); - } - return null; + return new GoogleCloudStorageEntity(storage, split.get()); } @Override - public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + protected Stream> getPrefixesSplitStream() { - if (byteSources != null) { - return byteSources.stream().map(InputSplit::new); - } - if (!uris.isEmpty()) { - return uris.stream().map(this::byteSourceFromUri).map(InputSplit::new); - } - return StreamSupport.stream(storageObjectIterable().spliterator(), false) .map(this::byteSourceFromStorageObject) .map(InputSplit::new); } @Override - public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public SplittableInputSource withSplit(InputSplit split) { - if (byteSources != null) { - return 1; - } - - if (!uris.isEmpty()) { - return uris.size(); - } - - return (int) StreamSupport.stream(storageObjectIterable().spliterator(), false).count(); + return new GoogleCloudStorageInputSource(storage, null, null, ImmutableList.of(split.get())); } - @Override - public SplittableInputSource withSplit(InputSplit split) - { - return new GoogleCloudStorageInputSource(storage, split.get()); - } - - @Override - public boolean needsFormat() - { - return true; - } - - @Override - protected InputSourceReader formattableReader( - InputRowSchema inputRowSchema, - InputFormat inputFormat, - @Nullable File temporaryDirectory - ) + private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) { - return new InputEntityIteratingReader( - inputRowSchema, - inputFormat, - createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(split.get())), - temporaryDirectory - ); - } - - private GoogleByteSource byteSourceFromUri(final URI uri) - { - final String bucket = uri.getAuthority(); - final String path = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); - return new GoogleByteSource(storage, bucket, path); - } - - private GoogleByteSource byteSourceFromStorageObject(final StorageObject storageObject) - { - return new GoogleByteSource(storage, storageObject.getBucket(), storageObject.getName()); + return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName()); } private Iterable storageObjectIterable() { return () -> - GoogleUtils.lazyFetchingStorageObjectsIterator(storage, prefixes.iterator(), RetryUtils.DEFAULT_MAX_TRIES); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GoogleCloudStorageInputSource that = (GoogleCloudStorageInputSource) o; - return Objects.equals(uris, that.uris) && - Objects.equals(prefixes, that.prefixes) && - Objects.equals(byteSources, that.byteSources); + GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), RetryUtils.DEFAULT_MAX_TRIES); } @Override - public int hashCode() + public String toString() { - return Objects.hash(uris, prefixes); + return "GoogleCloudStorageInputSource{" + + "uris=" + getUris() + + ", prefixes=" + getPrefixes() + + ", objects=" + getObjects() + + '}'; } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index ac591179c0d4..9fbb23f040c0 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -26,6 +26,7 @@ import com.google.common.base.Predicate; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; import java.net.URI; @@ -50,10 +51,6 @@ private static T retryGoogleCloudStorageOperation(RetryUtils.Task f) thro return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } - public static String extractGoogleCloudStorageObjectKey(URI uri) - { - return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); - } public static Iterator lazyFetchingStorageObjectsIterator( final GoogleStorage storage, @@ -82,7 +79,7 @@ private void prepareNextRequest() try { currentUri = uris.next(); currentBucket = currentUri.getAuthority(); - currentPrefix = GoogleUtils.extractGoogleCloudStorageObjectKey(currentUri); + currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); nextPageToken = null; listRequest = storage.list(currentBucket) .setPrefix(currentPrefix) diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index cd7574f36183..3ecdd4a974d8 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -23,52 +23,81 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.initialization.DruidModule; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.utils.CompressionUtils; +import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -public class GoogleCloudStorageInputSourceTest +public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest { - private static final GoogleStorage STORAGE = new GoogleStorage(null); + private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class); - private static final List URIS = Arrays.asList( - URI.create("gs://foo/bar/file.gz"), - URI.create("gs://bar/foo/file2.gz") + private static final List EXPECTED_URIS = Arrays.asList( + URI.create("gs://foo/bar/file.csv"), + URI.create("gs://bar/foo/file2.csv") ); + private static final List EXPECTED_COMPRESSED_URIS = Arrays.asList( + URI.create("gs://foo/bar/file.csv.gz"), + URI.create("gs://bar/foo/file2.csv.gz") + ); + + private static final List EXPECTED_OBJECTS = + EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList()); + private static final List PREFIXES = Arrays.asList( URI.create("gs://foo/bar"), URI.create("gs://bar/foo") ); - private static final List BYTE_SOURCES = Arrays.asList( - new GoogleByteSource(STORAGE, "foo", "bar/file.gz"), - new GoogleByteSource(STORAGE, "bar", "foo/file2.gz") - ); + private static final List EXPECTED_LOCATION = + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv")); + + private static final DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); @Test public void testSerde() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); - final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, URIS, ImmutableList.of(), null); + final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); @@ -104,18 +133,156 @@ public void testSerdeObjects() throws Exception @Test public void testWithUrisSplit() { - final List uris = Arrays.asList( - URI.create("gs://foo/bar/file.gz"), - URI.create("gs://bar/foo/file2.gz") + + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null ); + Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); + } - GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris, ImmutableList.of(), null); + @Test + public void testWithPrefixesSplit() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + EasyMock.replay(STORAGE); - Stream> splits = inputSource.createSplits( + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); + + Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), null ); - Assert.assertEquals(BYTE_SOURCES, splits.map(InputSplit::get).collect(Collectors.toList())); + + Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testReader() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedGetObjectMock(EXPECTED_URIS.get(0)); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + addExpectedGetObjectMock(EXPECTED_URIS.get(1)); + EasyMock.replay(STORAGE); + + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( + STORAGE, + null, + PREFIXES, + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + null + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + @Test + public void testCompressedReader() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0)); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1)); + EasyMock.replay(STORAGE); + + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( + STORAGE, + null, + PREFIXES, + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + null + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + private static void addExpectedPrefixObjects(URI prefix, List uris) throws IOException + { + final String bucket = prefix.getAuthority(); + + Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); + EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setMaxResults(EasyMock.anyLong())).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))).andReturn(listRequest).once(); + + List mockObjects = new ArrayList<>(); + for (URI uri : uris) { + StorageObject s = new StorageObject(); + s.setBucket(bucket); + s.setName(uri.getPath()); + mockObjects.add(s); + } + Objects response = new Objects(); + response.setItems(mockObjects); + EasyMock.expect(listRequest.execute()).andReturn(response).once(); + EasyMock.expect(response.getItems()).andReturn(mockObjects).once(); + + EasyMock.replay(listRequest); + } + + private static void addExpectedGetObjectMock(URI uri) throws IOException + { + CloudObjectLocation location = new CloudObjectLocation(uri); + + EasyMock.expect( + STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + ).andReturn(new ByteArrayInputStream(CONTENT)).once(); + } + + private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException + { + CloudObjectLocation location = new CloudObjectLocation(uri); + + ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); + CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped); + + EasyMock.expect( + STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + ).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once(); } public static ObjectMapper createGoogleObjectMapper() diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 0080ad8b7c6a..9642ebec108f 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -25,35 +25,25 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.AbstractInputSource; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; -import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; -import java.io.File; import java.net.URI; import java.util.List; -import java.util.Objects; import java.util.stream.Stream; import java.util.stream.StreamSupport; -public class S3InputSource extends AbstractInputSource implements SplittableInputSource +public class S3InputSource extends CloudObjectInputSource { private static final int MAX_LISTING_LENGTH = 1024; private final ServerSideEncryptingAmazonS3 s3Client; - private final List uris; - private final List prefixes; - private final List objects; @JsonCreator public S3InputSource( @@ -63,144 +53,42 @@ public S3InputSource( @JsonProperty("objects") @Nullable List objects ) { + super(S3StorageDruidModule.SCHEME, uris, prefixes, objects); this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); - this.uris = uris; - this.prefixes = prefixes; - this.objects = objects; - - if (!CollectionUtils.isNullOrEmpty(objects)) { - throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes)); - } else if (!CollectionUtils.isNullOrEmpty(uris)) { - throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes)); - uris.forEach(S3Utils::checkURI); - } else if (!CollectionUtils.isNullOrEmpty(prefixes)) { - prefixes.forEach(S3Utils::checkURI); - } else { - throwIfIllegalArgs(true); - } - } - - @JsonProperty - public List getUris() - { - return uris; - } - - @JsonProperty - public List getPrefixes() - { - return prefixes; } - @JsonProperty - public List getObjects() + @Override + protected S3Entity createEntity(InputSplit split) { - return objects; + return new S3Entity(s3Client, split.get()); } @Override - public Stream> createSplits( - InputFormat inputFormat, - @Nullable SplitHintSpec splitHintSpec - ) + protected Stream> getPrefixesSplitStream() { - if (objects != null) { - return objects.stream().map(InputSplit::new); - } - - if (uris != null) { - return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new); - } - return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false) .map(S3Utils::summaryToCloudObjectLocation) .map(InputSplit::new); } - @Override - public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) - { - if (objects != null) { - return objects.size(); - } - - if (uris != null) { - return uris.size(); - } - - return (int) StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false).count(); - } - @Override public SplittableInputSource withSplit(InputSplit split) { return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get())); } - @Override - public boolean needsFormat() - { - return true; - } - - @Override - protected InputSourceReader formattableReader( - InputRowSchema inputRowSchema, - InputFormat inputFormat, - @Nullable File temporaryDirectory - ) - { - return new InputEntityIteratingReader( - inputRowSchema, - inputFormat, - // formattableReader() is supposed to be called in each task that actually creates segments. - // The task should already have only one split in parallel indexing, - // while there's no need to make splits using splitHintSpec in sequential indexing. - createSplits(inputFormat, null).map(split -> new S3Entity(s3Client, split.get())), - temporaryDirectory - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - S3InputSource that = (S3InputSource) o; - return Objects.equals(uris, that.uris) && - Objects.equals(prefixes, that.prefixes) && - Objects.equals(objects, that.objects); - } - - @Override - public int hashCode() - { - return Objects.hash(uris, prefixes, objects); - } - @Override public String toString() { return "S3InputSource{" + - "uris=" + uris + - ", prefixes=" + prefixes + - ", objects=" + objects + + "uris=" + getUris() + + ", prefixes=" + getPrefixes() + + ", objects=" + getObjects() + '}'; } private Iterable getIterableObjectsFromPrefixes() { - return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, prefixes.iterator(), MAX_LISTING_LENGTH); - } - - private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException - { - if (clause) { - throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified"); - } + return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, getPrefixes().iterator(), MAX_LISTING_LENGTH); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 5aaa0f187709..ae7367709180 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -34,7 +34,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; @@ -289,10 +288,8 @@ public static URI checkURI(URI uri) { if (uri.getScheme().equalsIgnoreCase(S3StorageDruidModule.SCHEME_S3_ZIP)) { uri = URI.create(SCHEME + uri.toString().substring(S3StorageDruidModule.SCHEME_S3_ZIP.length())); - } else if (!SCHEME.equalsIgnoreCase(uri.getScheme())) { - throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), SCHEME); } - return uri; + return CloudObjectLocation.validateUriScheme(SCHEME, uri); } // Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder() From b69319de1958277b7e52bd5b14615ae1bcc1b04b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 05:46:13 -0800 Subject: [PATCH 04/10] unused --- extensions-core/google-extensions/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index 935f9f503ae2..51221513d061 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -165,11 +165,5 @@ jackson-module-guice test - - com.google.cloud - google-cloud-nio - 0.119.0-alpha - test - From 932c906f3ebe62db74e70ca418150cc6556c7eed Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 05:50:35 -0800 Subject: [PATCH 05/10] formatting --- .../data/input/impl/CloudObjectInputSource.java | 3 ++- .../google/GoogleCloudStorageInputSourceTest.java | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index e983d14ffc42..52ea0224f06d 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -37,7 +37,8 @@ import java.util.Objects; import java.util.stream.Stream; -public abstract class CloudObjectInputSource extends AbstractInputSource implements SplittableInputSource +public abstract class CloudObjectInputSource extends AbstractInputSource + implements SplittableInputSource { private final List uris; private final List prefixes; diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 3ecdd4a974d8..2146c5f4157e 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -97,7 +97,8 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe public void testSerde() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); - final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); + final GoogleCloudStorageInputSource withUris = + new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); @@ -134,7 +135,8 @@ public void testSerdeObjects() throws Exception public void testWithUrisSplit() { - GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); + GoogleCloudStorageInputSource inputSource = + new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -151,7 +153,8 @@ public void testWithPrefixesSplit() throws IOException addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); EasyMock.replay(STORAGE); - GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); + GoogleCloudStorageInputSource inputSource = + new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -247,7 +250,9 @@ private static void addExpectedPrefixObjects(URI prefix, List uris) throws EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); EasyMock.expect(listRequest.setMaxResults(EasyMock.anyLong())).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) + .andReturn(listRequest) + .once(); List mockObjects = new ArrayList<>(); for (URI uri : uris) { From ff836ff370d0deab3634b298ea22bc65b75b6cda Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 05:53:01 -0800 Subject: [PATCH 06/10] javadoc --- docs/development/extensions-core/google.md | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 5a64b611674b..510634aebada 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -76,11 +76,40 @@ This extension also provides an input source for Druid native batch ingestion to ... ``` + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "google", + "objects": [ + { "bucket": "foo", "path": "bar/file1.json"}, + { "bucket": "bar", "path": "foo/file2.json"} + ] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `google`.|N/A|yes| |uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` must be set| |prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` must be set| +|objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| + + +Google Cloud Storage object: + +|property|description|default|required?| +|--------|-----------|-------|---------| +|bucket|Name of the Google Cloud Storage bucket|N/A|yes| +|path|The path where data is located.|N/A|yes| ## Firehose From abebe26a6a12a9de2419f9f7a5714905d8bccfbe Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 14:20:10 -0800 Subject: [PATCH 07/10] dependencies --- .../druid/data/input/impl/CloudObjectInputSource.java | 5 +++-- extensions-core/google-extensions/pom.xml | 6 ++++++ extensions-core/s3-extensions/pom.xml | 10 +++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 52ea0224f06d..3f7f72c7aa55 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -92,8 +92,9 @@ public List getObjects() @Override public Stream> createSplits( - InputFormat inputFormat, @Nullable - SplitHintSpec splitHintSpec) + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) { if (!CollectionUtils.isNullOrEmpty(objects)) { return objects.stream().map(InputSplit::new); diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index 51221513d061..68e42cd9f246 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -165,5 +165,11 @@ jackson-module-guice test + + joda-time + joda-time + test + + diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 473002ccf8ae..3a70b4640b96 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -113,11 +113,6 @@ aws-java-sdk-s3 provided - - joda-time - joda-time - provided - org.apache.druid @@ -155,6 +150,11 @@ easymock test + + joda-time + joda-time + test + From f410493e211730f2bfd068aba53e07ad4afa60e3 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 27 Nov 2019 15:40:44 -0800 Subject: [PATCH 08/10] oops --- .../druid/data/input/google/GoogleCloudStorageInputSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index a867612523db..5e988a48e340 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -43,6 +43,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource Date: Wed, 4 Dec 2019 12:47:11 -0800 Subject: [PATCH 09/10] review comments --- .../druid/data/input/impl/CloudObjectInputSource.java | 7 +++++++ docs/development/extensions-core/google.md | 4 ++-- .../druid/data/input/google/GoogleCloudStorageEntity.java | 4 +++- .../data/input/google/GoogleCloudStorageInputSource.java | 2 +- .../main/java/org/apache/druid/data/input/s3/S3Entity.java | 3 ++- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 3f7f72c7aa55..29b402a1d468 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -86,8 +86,15 @@ public List getObjects() return objects; } + /** + * Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation} + */ protected abstract T createEntity(InputSplit split); + /** + * Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using + * this input sources backend API + */ protected abstract Stream> getPrefixesSplitStream(); @Override diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 510634aebada..9e5c63abf72b 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -99,8 +99,8 @@ This extension also provides an input source for Druid native batch ingestion to |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `google`.|N/A|yes| -|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` must be set| -|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` must be set| +|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 907afe6f8c7c..4c1b53abc2b9 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -34,10 +34,12 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity { + private final CloudObjectLocation location; private final GoogleByteSource byteSource; GoogleCloudStorageEntity(GoogleStorage storage, CloudObjectLocation location) { + this.location = location; this.byteSource = new GoogleByteSource(storage, location.getBucket(), location.getPath()); } @@ -45,7 +47,7 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity @Override public URI getUri() { - return null; + return location.toUri(GoogleCloudStorageInputSource.SCHEME); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 5e988a48e340..b5b8bcd4a624 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -40,7 +40,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - private static final String SCHEME = "gs"; + static final String SCHEME = "gs"; private final GoogleStorage storage; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java index 00efaaa0691b..9c05cc31d340 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -47,7 +48,7 @@ public class S3Entity extends RetryingInputEntity @Override public URI getUri() { - return null; + return object.toUri(S3StorageDruidModule.SCHEME); } @Override From 495ee050806412e412157f4fbf813c4659bad917 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 4 Dec 2019 17:41:40 -0800 Subject: [PATCH 10/10] better javadoc --- .../druid/data/input/impl/CloudObjectInputSource.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 29b402a1d468..c8b7a82bf344 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -87,13 +87,16 @@ public List getObjects() } /** - * Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation} + * Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This + * is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}. */ protected abstract T createEntity(InputSplit split); /** * Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using - * this input sources backend API + * this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits}, + * only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}. + * Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception. */ protected abstract Stream> getPrefixesSplitStream();