From 367a1fae0cd58baf3d0e3a16c84d8bc4bd4d925a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 02:16:51 -0800 Subject: [PATCH 1/6] add google cloud storage InputSource for native batch --- .../impl/InputEntityIteratingReader.java | 2 +- docs/development/extensions-core/google.md | 28 ++++ .../input/google/GoogleCloudStoreEntity.java | 67 ++++++++++ .../google/GoogleCloudStoreInputSource.java | 122 ++++++++++++++++++ .../google/GoogleStorageDruidModule.java | 7 +- .../druid/storage/google/GoogleUtils.java | 10 +- .../GoogleCloudStoreInputSourceTest.java | 121 +++++++++++++++++ ...ticGoogleBlobStoreFirehoseFactoryTest.java | 45 +------ 8 files changed, 355 insertions(+), 47 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index 385bc5f14591..1869756eaa13 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -46,7 +46,7 @@ public class InputEntityIteratingReader implements InputSourceReader private final Iterator sourceIterator; private final File temporaryDirectory; - InputEntityIteratingReader( + public InputEntityIteratingReader( InputRowSchema inputRowSchema, InputFormat inputFormat, Stream sourceStream, diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 90402d728dc9..309b9f1bfc2f 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -38,6 +38,34 @@ Deep storage can be written to Google Cloud Storage either via this extension or |`druid.google.prefix`||GCS prefix.|No-prefix| + + +## Google cloud storage batch ingestion input source + +This extension also provides an input source for Druid native batch ingestion to support reading objects directly from Google CloudStorage. Objects can be specified as list of Google CloudStorage URI strings. The Google CloudStorage input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object. + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "google", + "uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be `google`.|N/A|yes| +|uris|JSON array of URIs where Google CloudStorage files to be ingested are located.|N/A|yes| + + ## Firehose diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java new file mode 100644 index 000000000000..8fdb9939b22c --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.google; + +import com.google.common.base.Predicate; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.storage.google.GoogleByteSource; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleUtils; +import org.apache.druid.utils.CompressionUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +public class GoogleCloudStoreEntity implements InputEntity +{ + private final GoogleStorage storage; + private final URI uri; + + GoogleCloudStoreEntity(GoogleStorage storage, URI uri) + { + this.storage = storage; + this.uri = uri; + } + + @Nullable + @Override + public URI getUri() + { + return uri; + } + + @Override + public InputStream open() throws IOException + { + // Get data of the given object and open an input stream + final String bucket = uri.getAuthority(); + final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); + final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); + return CompressionUtils.decompress(byteSource.openStream(), uri.toString()); + } + + @Override + public Predicate getFetchRetryCondition() + { + return GoogleUtils.GOOGLE_RETRY; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java new file mode 100644 index 000000000000..09d8a94cf209 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.google; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.google.GoogleStorage; + +import javax.annotation.Nullable; +import java.io.File; +import java.net.URI; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public class GoogleCloudStoreInputSource extends AbstractInputSource implements SplittableInputSource +{ + private final GoogleStorage storage; + private final List uris; + + @JsonCreator + public GoogleCloudStoreInputSource( + @JacksonInject("googleStorage") GoogleStorage storage, + @JsonProperty("uris") List uris + ) + { + this.storage = storage; + this.uris = uris; + } + + @JsonProperty("uris") + public List getUris() + { + return uris; + } + + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return uris.stream().map(InputSplit::new); + } + + @Override + public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return uris.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new GoogleCloudStoreInputSource(storage, ImmutableList.of(split.get())); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + protected InputSourceReader formattableReader( + InputRowSchema inputRowSchema, + InputFormat inputFormat, + @Nullable File temporaryDirectory + ) + { + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null).map(split -> new GoogleCloudStoreEntity(storage, split.get())), + temporaryDirectory + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleCloudStoreInputSource that = (GoogleCloudStoreInputSource) o; + return Objects.equals(uris, that.uris); + } + + @Override + public int hashCode() + { + return Objects.hash(uris); + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index 74a58c419193..b5e80a8c58ad 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; +import org.apache.druid.data.input.google.GoogleCloudStoreInputSource; import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory; import org.apache.druid.guice.Binders; import org.apache.druid.guice.JsonConfigProvider; @@ -41,7 +42,7 @@ public class GoogleStorageDruidModule implements DruidModule { - public static final String SCHEME = "google"; + static final String SCHEME = "google"; private static final Logger LOG = new Logger(GoogleStorageDruidModule.class); private static final String APPLICATION_NAME = "druid-google-extensions"; @@ -72,7 +73,9 @@ public void setupModule(SetupContext context) } }, new SimpleModule().registerSubtypes( - new NamedType(StaticGoogleBlobStoreFirehoseFactory.class, "static-google-blobstore")) + new NamedType(StaticGoogleBlobStoreFirehoseFactory.class, "static-google-blobstore"), + new NamedType(GoogleCloudStoreInputSource.class, SCHEME) + ) ); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index c3d3e4acb90a..1dd12ef2388e 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,12 +20,13 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; +import com.google.common.base.Predicate; import java.io.IOException; +import java.net.URI; public class GoogleUtils { - public static boolean isRetryable(Throwable t) { if (t instanceof HttpResponseException) { @@ -34,4 +35,11 @@ public static boolean isRetryable(Throwable t) } return t instanceof IOException; } + + public static String extractGoogleCloudStorageObjectKey(URI uri) + { + return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); + } + + public static final Predicate GOOGLE_RETRY = e -> isRetryable(e); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java new file mode 100644 index 000000000000..f9df1f8799d4 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.google; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.storage.google.GoogleStorage; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class GoogleCloudStoreInputSourceTest +{ + private static final GoogleStorage STORAGE = new GoogleStorage(null); + + @Test + public void testSerde() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + + final List uris = Arrays.asList( + new URI("gs://foo/bar/file.gz"), + new URI("gs://bar/foo/file2.gz") + ); + + final List prefixes = Arrays.asList( + new URI("gs://foo/bar"), + new URI("gs://bar/foo") + ); + + final GoogleCloudStoreInputSource withUris = new GoogleCloudStoreInputSource(STORAGE, uris); + final GoogleCloudStoreInputSource serdeWithUris = + mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStoreInputSource.class); + Assert.assertEquals(withUris, serdeWithUris); + } + + @Test + public void testWithUrisSplit() + { + final List uris = Arrays.asList( + URI.create("gs://foo/bar/file.gz"), + URI.create("gs://bar/foo/file2.gz") + ); + + GoogleCloudStoreInputSource inputSource = new GoogleCloudStoreInputSource(STORAGE, uris); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + Assert.assertEquals(uris, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + public static ObjectMapper createGoogleObjectMapper() + { + final DruidModule baseModule = new TestGoogleModule(); + final ObjectMapper baseMapper = new DefaultObjectMapper(); + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + return injector.getInstance(ObjectMapper.class); + } + + private static class TestGoogleModule implements DruidModule + { + @Override + public List getJacksonModules() + { + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public GoogleStorage getRestS3Service() + { + return STORAGE; + } + } +} \ No newline at end of file diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java index 5bb60d774c06..e35f6a5b00f6 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java @@ -19,17 +19,9 @@ package org.apache.druid.firehose.google; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.module.guice.ObjectMapperModule; import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Provides; -import org.apache.druid.initialization.DruidModule; -import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.data.input.google.GoogleCloudStoreInputSourceTest; import org.apache.druid.storage.google.GoogleStorage; import org.junit.Assert; import org.junit.Test; @@ -44,7 +36,7 @@ public class StaticGoogleBlobStoreFirehoseFactoryTest @Test public void testSerde() throws IOException { - final ObjectMapper mapper = createObjectMapper(new TestGoogleModule()); + final ObjectMapper mapper = GoogleCloudStoreInputSourceTest.createGoogleObjectMapper(); final List blobs = ImmutableList.of( new GoogleBlob("foo", "bar"), @@ -68,37 +60,4 @@ public void testSerde() throws IOException Assert.assertEquals(factory, outputFact); } - - private static ObjectMapper createObjectMapper(DruidModule baseModule) - { - final ObjectMapper baseMapper = new DefaultObjectMapper(); - baseModule.getJacksonModules().forEach(baseMapper::registerModule); - - final Injector injector = Guice.createInjector( - new ObjectMapperModule(), - baseModule - ); - return injector.getInstance(ObjectMapper.class); - } - - private static class TestGoogleModule implements DruidModule - { - @Override - public List getJacksonModules() - { - return ImmutableList.of(new SimpleModule()); - } - - @Override - public void configure(Binder binder) - { - - } - - @Provides - public GoogleStorage getRestS3Service() - { - return STORAGE; - } - } } From bf63b4a6fce11ba7d68461d02796d648a44da3e1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 02:29:57 -0800 Subject: [PATCH 2/6] rename --- ...dStoreEntity.java => GoogleCloudStorageEntity.java} | 4 ++-- ...tSource.java => GoogleCloudStorageInputSource.java} | 10 +++++----- .../druid/storage/google/GoogleStorageDruidModule.java | 4 ++-- ...est.java => GoogleCloudStorageInputSourceTest.java} | 10 +++++----- .../StaticGoogleBlobStoreFirehoseFactoryTest.java | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) rename extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/{GoogleCloudStoreEntity.java => GoogleCloudStorageEntity.java} (94%) rename extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/{GoogleCloudStoreInputSource.java => GoogleCloudStorageInputSource.java} (90%) rename extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/{GoogleCloudStoreInputSourceTest.java => GoogleCloudStorageInputSourceTest.java} (91%) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java similarity index 94% rename from extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java rename to extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 8fdb9939b22c..4c2754f9b736 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -31,12 +31,12 @@ import java.io.InputStream; import java.net.URI; -public class GoogleCloudStoreEntity implements InputEntity +public class GoogleCloudStorageEntity implements InputEntity { private final GoogleStorage storage; private final URI uri; - GoogleCloudStoreEntity(GoogleStorage storage, URI uri) + GoogleCloudStorageEntity(GoogleStorage storage, URI uri) { this.storage = storage; this.uri = uri; diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java similarity index 90% rename from extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java rename to extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 09d8a94cf209..b732d210cd23 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -40,13 +40,13 @@ import java.util.Objects; import java.util.stream.Stream; -public class GoogleCloudStoreInputSource extends AbstractInputSource implements SplittableInputSource +public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource { private final GoogleStorage storage; private final List uris; @JsonCreator - public GoogleCloudStoreInputSource( + public GoogleCloudStorageInputSource( @JacksonInject("googleStorage") GoogleStorage storage, @JsonProperty("uris") List uris ) @@ -77,7 +77,7 @@ public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHi @Override public SplittableInputSource withSplit(InputSplit split) { - return new GoogleCloudStoreInputSource(storage, ImmutableList.of(split.get())); + return new GoogleCloudStorageInputSource(storage, ImmutableList.of(split.get())); } @Override @@ -96,7 +96,7 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).map(split -> new GoogleCloudStoreEntity(storage, split.get())), + createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(storage, split.get())), temporaryDirectory ); } @@ -110,7 +110,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - GoogleCloudStoreInputSource that = (GoogleCloudStoreInputSource) o; + GoogleCloudStorageInputSource that = (GoogleCloudStorageInputSource) o; return Objects.equals(uris, that.uris); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index b5e80a8c58ad..c30ce073579c 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -30,7 +30,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; -import org.apache.druid.data.input.google.GoogleCloudStoreInputSource; +import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory; import org.apache.druid.guice.Binders; import org.apache.druid.guice.JsonConfigProvider; @@ -74,7 +74,7 @@ public void setupModule(SetupContext context) }, new SimpleModule().registerSubtypes( new NamedType(StaticGoogleBlobStoreFirehoseFactory.class, "static-google-blobstore"), - new NamedType(GoogleCloudStoreInputSource.class, SCHEME) + new NamedType(GoogleCloudStorageInputSource.class, SCHEME) ) ); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java similarity index 91% rename from extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java rename to extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index f9df1f8799d4..4e9a6d2b3acf 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStoreInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -43,7 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class GoogleCloudStoreInputSourceTest +public class GoogleCloudStorageInputSourceTest { private static final GoogleStorage STORAGE = new GoogleStorage(null); @@ -62,9 +62,9 @@ public void testSerde() throws Exception new URI("gs://bar/foo") ); - final GoogleCloudStoreInputSource withUris = new GoogleCloudStoreInputSource(STORAGE, uris); - final GoogleCloudStoreInputSource serdeWithUris = - mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStoreInputSource.class); + final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, uris); + final GoogleCloudStorageInputSource serdeWithUris = + mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); } @@ -76,7 +76,7 @@ public void testWithUrisSplit() URI.create("gs://bar/foo/file2.gz") ); - GoogleCloudStoreInputSource inputSource = new GoogleCloudStoreInputSource(STORAGE, uris); + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java index e35f6a5b00f6..c9996b53ccbc 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.google.GoogleCloudStoreInputSourceTest; +import org.apache.druid.data.input.google.GoogleCloudStorageInputSourceTest; import org.apache.druid.storage.google.GoogleStorage; import org.junit.Assert; import org.junit.Test; @@ -36,7 +36,7 @@ public class StaticGoogleBlobStoreFirehoseFactoryTest @Test public void testSerde() throws IOException { - final ObjectMapper mapper = GoogleCloudStoreInputSourceTest.createGoogleObjectMapper(); + final ObjectMapper mapper = GoogleCloudStorageInputSourceTest.createGoogleObjectMapper(); final List blobs = ImmutableList.of( new GoogleBlob("foo", "bar"), From 4d932e2acc5468bd16c48e705f8d540130f04c78 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 04:24:37 -0800 Subject: [PATCH 3/6] checkstyle --- extensions-core/google-extensions/pom.xml | 7 ++++++- .../input/google/GoogleCloudStorageInputSourceTest.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index c94b1c76c849..d98910b27180 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -117,7 +117,12 @@ google-api-client provided - + + com.google.code.findbugs + jsr305 + 2.0.1 + provided + org.apache.druid diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 4e9a6d2b3acf..8dbb3af0427a 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -118,4 +118,4 @@ public GoogleStorage getRestS3Service() return STORAGE; } } -} \ No newline at end of file +} From 243c817ad1d8bdf2646a9d8bbca8991a675a27a2 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 05:35:09 -0800 Subject: [PATCH 4/6] fix --- docs/development/extensions-core/google.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 309b9f1bfc2f..6dafa6ad0469 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -42,7 +42,7 @@ Deep storage can be written to Google Cloud Storage either via this extension or ## Google cloud storage batch ingestion input source -This extension also provides an input source for Druid native batch ingestion to support reading objects directly from Google CloudStorage. Objects can be specified as list of Google CloudStorage URI strings. The Google CloudStorage input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object. +This extension also provides an input source for Druid native batch ingestion to support reading objects directly from Google Cloud Storage. Objects can be specified as list of Google Cloud Storage URI strings. The Google Cloud Storage input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object. ```json ... @@ -63,7 +63,7 @@ This extension also provides an input source for Druid native batch ingestion to |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `google`.|N/A|yes| -|uris|JSON array of URIs where Google CloudStorage files to be ingested are located.|N/A|yes| +|uris|JSON array of URIs where Google Cloud Storage files to be ingested are located.|N/A|yes| ## Firehose From a97d5ab7548262a0b1717442b39f984c59f1a0b6 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 15:01:54 -0800 Subject: [PATCH 5/6] fix spelling --- .../data/input/google/GoogleCloudStorageInputSourceTest.java | 2 +- website/.spelling | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 8dbb3af0427a..a3b77afd890b 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -113,7 +113,7 @@ public void configure(Binder binder) } @Provides - public GoogleStorage getRestS3Service() + public GoogleStorage getGoogleStorage() { return STORAGE; } diff --git a/website/.spelling b/website/.spelling index c177e709b17b..d28328196e8e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -358,6 +358,7 @@ unmerged unparseable unparsed uptime +uris v1 v2 vCPUs From b3e86f2d76dc3a1e1b288b6fb280de150d7839b7 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 19 Nov 2019 17:49:48 -0800 Subject: [PATCH 6/6] review comments --- .../druid/data/input/google/GoogleCloudStorageEntity.java | 2 +- .../druid/data/input/google/GoogleCloudStorageInputSource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 4c2754f9b736..5a3256eb374f 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -56,7 +56,7 @@ public InputStream open() throws IOException final String bucket = uri.getAuthority(); final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); - return CompressionUtils.decompress(byteSource.openStream(), uri.toString()); + return CompressionUtils.decompress(byteSource.openStream(), uri.getPath()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index b732d210cd23..1bc99ad8b4ba 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -47,7 +47,7 @@ public class GoogleCloudStorageInputSource extends AbstractInputSource implement @JsonCreator public GoogleCloudStorageInputSource( - @JacksonInject("googleStorage") GoogleStorage storage, + @JacksonInject GoogleStorage storage, @JsonProperty("uris") List uris ) {