diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md
index 3626e7ba6309..9c9f2e846120 100644
--- a/docs/content/configuration/indexing-service.md
+++ b/docs/content/configuration/indexing-service.md
@@ -19,11 +19,11 @@ The indexing service uses several of the global configs in [Configuration](../co
#### Task Logging
-If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store or HDFS.
+If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.indexer.logs.type`|Choices:noop, s3, azure, hdfs, file. Where to store task logs|file|
+|`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
##### File Task Logs
@@ -52,6 +52,16 @@ Note: this uses the same storage account as the deep storage module for azure.
|`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none|
|`druid.indexer.logs.prefix`|The path to prepend to logs|none|
+#### Google Cloud Storage Task Logs
+Store task logs in Google Cloud Storage.
+
+Note: this uses the same storage settings as the deep storage module for google.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.indexer.logs.bucket`|The Google Cloud Storage bucket to write logs to|none|
+|`druid.indexer.logs.prefix`|The path to prepend to logs|none|
+
##### HDFS Task Logs
Store task logs in HDFS.
diff --git a/docs/content/development/extensions-contrib/google.md b/docs/content/development/extensions-contrib/google.md
new file mode 100644
index 000000000000..3bef6db1cda9
--- /dev/null
+++ b/docs/content/development/extensions-contrib/google.md
@@ -0,0 +1,55 @@
+---
+layout: doc_page
+---
+
+# Google Cloud Storage
+
+To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-google-extensions` extension.
+
+## Deep Storage
+
+[Google Cloud Storage](https://cloud.google.com/storage/) is another option for deep storage. This requires some additional druid configuration.
+
+|Property|Description|Default|Required?|
+|--------|-----------|-------|---------|
+|bucket|Name of the Google Cloud bucket|N/A|yes|
+|path|The path where data is located.|N/A|yes|
+
+## Firehose
+
+#### StaticGoogleBlobStoreFirehose
+
+This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.
+
+As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
+
+Sample spec:
+
+```json
+"firehose" : {
+ "type" : "static-google-blobstore",
+ "blobs": [
+ {
+ "bucket": "foo",
+ "path": "/path/to/your/file.json"
+ },
+ {
+ "container": "bar",
+ "path": "/another/path.json"
+ }
+ ]
+}
+```
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should be "static-google-blobstore".|N/A|yes|
+|blobs|JSON array of Google Blobs.|N/A|yes|
+
+Google Blobs:
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|bucket|Name of the Google Cloud bucket|N/A|yes|
+|path|The path where data is located.|N/A|yes|
+
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index ecef97f5824e..e0bcbd01a699 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -59,6 +59,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)|
+|druid-google-extensions|Google Cloud Storage deep storage.|[link](../development/extensions-contrib/google.html)|
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
diff --git a/docs/content/development/modules.md b/docs/content/development/modules.md
index 934277b02f9d..1ba5449bf6b4 100644
--- a/docs/content/development/modules.md
+++ b/docs/content/development/modules.md
@@ -49,7 +49,7 @@ If your jar has this file, then when it is added to the classpath or as an exten
### Adding a new deep storage implementation
-Check the `azure-storage`, `cassandra-storage`, `hdfs-storage` and `s3-extensions` modules for examples of how to do this.
+Check the `azure-storage`, `google-storage`, `cassandra-storage`, `hdfs-storage` and `s3-extensions` modules for examples of how to do this.
The basic idea behind the extension is that you need to add bindings for your DataSegmentPusher and DataSegmentPuller objects. The way to add them is something like (taken from HdfsStorageDruidModule)
diff --git a/extensions-contrib/google-extensions/pom.xml b/extensions-contrib/google-extensions/pom.xml
new file mode 100644
index 000000000000..8a7f93b2890b
--- /dev/null
+++ b/extensions-contrib/google-extensions/pom.xml
@@ -0,0 +1,73 @@
+
+
+
+
+ 4.0.0
+
+ io.druid.extensions.contrib
+ druid-google-extensions
+ druid-google-extensions
+ druid-google-extensions
+
+
+ io.druid
+ druid
+ 0.9.3-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-api
+ ${project.parent.version}
+ provided
+
+
+
+ com.google.apis
+ google-api-services-storage
+ v1-rev79-1.22.0
+
+
+ com.google.http-client
+ google-http-client-jackson2
+ 1.22.0
+
+
+
+
+ junit
+ junit
+ test
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ test
+
+
+ org.easymock
+ easymock
+ test
+
+
+
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java
new file mode 100644
index 000000000000..0b6b6cc6eba0
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.firehose.google;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class GoogleBlob {
+ private final String bucket;
+ private final String path;
+
+ @JsonCreator
+ public GoogleBlob(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path) {
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ @JsonProperty
+ public String getBucket() {
+ return bucket;
+ }
+
+ @JsonProperty
+ public String getPath() {
+ return path;
+ }
+}
+
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java
new file mode 100644
index 000000000000..907571d58ffa
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.firehose.google;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.impl.FileIteratingFirehose;
+import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.java.util.common.CompressionUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.storage.google.GoogleByteSource;
+import io.druid.storage.google.GoogleStorage;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class StaticGoogleBlobStoreFirehoseFactory implements FirehoseFactory {
+ private static final Logger LOG = new Logger(StaticGoogleBlobStoreFirehoseFactory.class);
+
+ private final GoogleStorage storage;
+ private final List blobs;
+
+ @JsonCreator
+ public StaticGoogleBlobStoreFirehoseFactory(
+ @JacksonInject GoogleStorage storage,
+ @JsonProperty("blobs") GoogleBlob[] blobs
+ ) {
+ this.storage = storage;
+ this.blobs = ImmutableList.copyOf(blobs);
+ }
+
+ @JsonProperty
+ public List getBlobs() {
+ return blobs;
+ }
+
+ @Override
+ public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException {
+ Preconditions.checkNotNull(storage, "null storage");
+
+ final LinkedList objectQueue = Lists.newLinkedList(blobs);
+
+ return new FileIteratingFirehose(
+ new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return !objectQueue.isEmpty();
+ }
+
+ @Override
+ public LineIterator next() {
+ final GoogleBlob nextURI = objectQueue.poll();
+
+ final String bucket = nextURI.getBucket();
+ final String path = nextURI.getPath().startsWith("/")
+ ? nextURI.getPath().substring(1)
+ : nextURI.getPath();
+
+ try {
+ final InputStream innerInputStream = new GoogleByteSource(storage, bucket, path).openStream();
+
+ final InputStream outerInputStream = path.endsWith(".gz")
+ ? CompressionUtils.gzipInputStream(innerInputStream)
+ : innerInputStream;
+
+ return IOUtils.lineIterator(
+ new BufferedReader(
+ new InputStreamReader(outerInputStream, Charsets.UTF_8)
+ )
+ );
+ } catch (Exception e) {
+ LOG.error(e,
+ "Exception opening bucket[%s] blob[%s]",
+ bucket,
+ path
+ );
+
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ },
+ stringInputRowParser
+ );
+ }
+}
+
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleAccountConfig.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleAccountConfig.java
new file mode 100644
index 000000000000..e7f031c41fbb
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleAccountConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.validation.constraints.NotNull;
+
+public class GoogleAccountConfig
+{
+ @JsonProperty
+ @NotNull
+ private String bucket;
+
+ @JsonProperty
+ private String prefix;
+
+ public void setBucket(String bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public void setPrefix(String prefix)
+ {
+ this.prefix = prefix;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java
new file mode 100644
index 000000000000..047698f2e63e
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.common.io.ByteSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class GoogleByteSource extends ByteSource
+{
+ private final GoogleStorage storage;
+ private final String bucket;
+ private final String path;
+
+ public GoogleByteSource(final GoogleStorage storage, final String bucket, final String path)
+ {
+ this.storage = storage;
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ @Override
+ public InputStream openStream() throws IOException
+ {
+ return storage.get(bucket, path);
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentKiller.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentKiller.java
new file mode 100644
index 000000000000..20f2a37cd796
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentKiller.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.inject.Inject;
+import io.druid.java.util.common.MapUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.loading.DataSegmentKiller;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class GoogleDataSegmentKiller implements DataSegmentKiller
+{
+ private static final Logger LOG = new Logger(GoogleDataSegmentKiller.class);
+
+ private final GoogleStorage storage;
+
+ @Inject
+ public GoogleDataSegmentKiller(final GoogleStorage storage)
+ {
+ this.storage = storage;
+ }
+
+ @Override
+ public void kill(DataSegment segment) throws SegmentLoadingException
+ {
+ LOG.info("Killing segment [%s]", segment);
+
+ Map loadSpec = segment.getLoadSpec();
+ final String bucket = MapUtils.getString(loadSpec, "bucket");
+ final String indexPath = MapUtils.getString(loadSpec, "path");
+ final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
+
+ try {
+ storage.delete(bucket, indexPath);
+ storage.delete(bucket, descriptorPath);
+ }
+ catch(IOException e) {
+ throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e.getMessage());
+ }
+ }
+
+ @Override
+ public void killAll() throws IOException
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java
new file mode 100644
index 000000000000..cab2d0632ecf
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.common.base.Predicate;
+import com.google.inject.Inject;
+import io.druid.java.util.common.CompressionUtils;
+import io.druid.java.util.common.FileUtils;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.MapUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.loading.DataSegmentPuller;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.segment.loading.URIDataPuller;
+import io.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+
+public class GoogleDataSegmentPuller implements DataSegmentPuller, URIDataPuller
+{
+ private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class);
+
+ private final GoogleStorage storage;
+
+ @Inject
+ public GoogleDataSegmentPuller(final GoogleStorage storage)
+ {
+ this.storage = storage;
+ }
+
+ @Override
+ public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
+ {
+ final Map loadSpec = segment.getLoadSpec();
+ final String bucket = MapUtils.getString(loadSpec, "bucket");
+ final String path = MapUtils.getString(loadSpec, "path");
+
+ getSegmentFiles(bucket, path, outDir);
+ }
+
+ public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path, File outDir)
+ throws SegmentLoadingException
+ {
+ LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath());
+
+ prepareOutDir(outDir);
+
+ try {
+ final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
+ final FileUtils.FileCopyResult result = CompressionUtils.unzip(
+ byteSource,
+ outDir,
+ GoogleUtils.GOOGLE_RETRY,
+ true
+ );
+ LOG.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
+ return result;
+ }
+ catch (Exception e) {
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(outDir);
+ }
+ catch (IOException ioe) {
+ LOG.warn(
+ ioe, "Failed to remove output directory [%s] for segment pulled from [%s]",
+ outDir.getAbsolutePath(), path
+ );
+ }
+ throw new SegmentLoadingException(e, e.getMessage());
+ }
+ }
+
+ // Needs to be public for the tests.
+ public void prepareOutDir(final File outDir) throws ISE
+ {
+ if (!outDir.exists()) {
+ outDir.mkdirs();
+ }
+
+ if (!outDir.isDirectory()) {
+ throw new ISE("outDir[%s] must be a directory.", outDir);
+ }
+ }
+
+ @Override
+ public InputStream getInputStream(URI uri) throws IOException
+ {
+ String path = uri.getPath();
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ return storage.get(uri.getHost(), path);
+ }
+
+ @Override
+ public String getVersion(URI uri) throws IOException
+ {
+ String path = uri.getPath();
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ return storage.version(uri.getHost(), path);
+ }
+
+ @Override
+ public Predicate shouldRetryPredicate()
+ {
+ return new Predicate()
+ {
+ @Override
+ public boolean apply(Throwable e)
+ {
+ if (e == null) {
+ return false;
+ }
+ if (GoogleUtils.GOOGLE_RETRY.apply(e)) {
+ return true;
+ }
+ // Look all the way down the cause chain, just in case something wraps it deep.
+ return apply(e.getCause());
+ }
+ };
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java
new file mode 100644
index 000000000000..8bd9fbb8ee7c
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.http.InputStreamContent;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import io.druid.java.util.common.CompressionUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.SegmentUtils;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.DataSegmentPusherUtil;
+import io.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class GoogleDataSegmentPusher implements DataSegmentPusher
+{
+ private static final Logger LOG = new Logger(GoogleDataSegmentPusher.class);
+
+ private final GoogleStorage storage;
+ private final GoogleAccountConfig config;
+ private final ObjectMapper jsonMapper;
+
+ @Inject
+ public GoogleDataSegmentPusher(
+ final GoogleStorage storage,
+ final GoogleAccountConfig config,
+ final ObjectMapper jsonMapper
+ )
+ {
+ this.storage = storage;
+ this.config = config;
+ this.jsonMapper = jsonMapper;
+
+ LOG.info("Configured Google as deep storage");
+ }
+
+ @Deprecated
+ @Override
+ public String getPathForHadoop(String dataSource)
+ {
+ return getPathForHadoop();
+ }
+
+ @Override
+ public String getPathForHadoop()
+ {
+ return String.format("gs://%s/%s", config.getBucket(), config.getPrefix());
+ }
+
+ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment)
+ throws IOException
+ {
+ File descriptorFile = File.createTempFile("descriptor", ".json");
+ try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
+ stream.write(jsonMapper.writeValueAsBytes(segment));
+ }
+
+ return descriptorFile;
+ }
+
+ public void insert(final File file, final String contentType, final String path) throws IOException {
+ LOG.info("Inserting [%s] to [%s]", file, path);
+
+ FileInputStream fileSteam = new FileInputStream(file);
+
+ InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
+ mediaContent.setLength(file.length());
+
+ storage.insert(config.getBucket(), path, mediaContent);
+ }
+
+ @Override
+ public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
+ {
+ LOG.info("Uploading [%s] to Google.", indexFilesDir);
+
+ final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
+ File indexFile = null;
+ File descriptorFile = null;
+
+ try {
+ indexFile = File.createTempFile("index", ".zip");
+ final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
+ final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
+ final String indexPath = buildPath(storageDir + "/" + "index.zip");
+ final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json");
+
+ final DataSegment outSegment = segment
+ .withSize(indexSize)
+ .withLoadSpec(
+ ImmutableMap.of(
+ "type", GoogleStorageDruidModule.SCHEME,
+ "bucket", config.getBucket(),
+ "path", indexPath
+ )
+ )
+ .withBinaryVersion(version);
+
+ descriptorFile = createDescriptorFile(jsonMapper, outSegment);
+
+ insert(indexFile, "application/zip", indexPath);
+ insert(descriptorFile, "application/json", descriptorPath);
+
+ return outSegment;
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ } finally {
+ if (indexFile != null) {
+ LOG.info("Deleting file [%s]", indexFile);
+ indexFile.delete();
+ }
+
+ if (descriptorFile != null) {
+ LOG.info("Deleting file [%s]", descriptorFile);
+ descriptorFile.delete();
+ }
+ }
+ }
+
+ public String buildPath(final String path)
+ {
+ if (config.getPrefix() != "") {
+ return config.getPrefix() + "/" + path;
+ } else {
+ return path;
+ }
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java
new file mode 100644
index 000000000000..ef6321b1e5f6
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import io.druid.segment.loading.LoadSpec;
+import io.druid.segment.loading.SegmentLoadingException;
+
+import java.io.File;
+
+@JsonTypeName(GoogleStorageDruidModule.SCHEME)
+public class GoogleLoadSpec implements LoadSpec
+{
+ @JsonProperty
+ private final String bucket;
+
+ @JsonProperty
+ private final String path;
+
+ private final GoogleDataSegmentPuller puller;
+
+ @JsonCreator
+ public GoogleLoadSpec(
+ @JsonProperty("bucket") String bucket,
+ @JsonProperty("path") String path,
+ @JacksonInject GoogleDataSegmentPuller puller
+ )
+ {
+ Preconditions.checkNotNull(bucket);
+ Preconditions.checkNotNull(path);
+ this.bucket = bucket;
+ this.path = path;
+ this.puller = puller;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
+ {
+ return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file).size());
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java
new file mode 100644
index 000000000000..2ebc377e18fd
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.api.client.http.AbstractInputStreamContent;
+import com.google.api.services.storage.Storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class GoogleStorage
+{
+ private final Storage storage;
+
+ public GoogleStorage(Storage storage)
+ {
+ this.storage = storage;
+ }
+
+ public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
+ {
+ Storage.Objects.Insert insertObject = storage.objects().insert(bucket, null, mediaContent);
+ insertObject.setName(path);
+ insertObject.getMediaHttpUploader().setDirectUploadEnabled(false);
+ insertObject.execute();
+ }
+
+ public InputStream get(final String bucket, final String path) throws IOException
+ {
+ Storage.Objects.Get getObject = storage.objects().get(bucket, path);
+ getObject.getMediaHttpDownloader().setDirectDownloadEnabled(false);
+ return getObject.executeMediaAsInputStream();
+ }
+
+ public void delete(final String bucket, final String path) throws IOException
+ {
+ storage.objects().delete(bucket, path).execute();
+ }
+
+ public boolean exists(final String bucket, final String path)
+ {
+ try {
+ return storage.objects().get(bucket, path).executeUsingHead().isSuccessStatusCode();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public long size(final String bucket, final String path) throws IOException
+ {
+ return storage.objects().get(bucket, path).execute().getSize().longValue();
+ }
+
+ public String version(final String bucket, final String path) throws IOException
+ {
+ return storage.objects().get(bucket, path).execute().getEtag();
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorageDruidModule.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorageDruidModule.java
new file mode 100644
index 000000000000..1af95e016c77
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorageDruidModule.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.StorageScopes;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import io.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory;
+import io.druid.guice.Binders;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.LazySingleton;
+import io.druid.initialization.DruidModule;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+
+public class GoogleStorageDruidModule implements DruidModule
+{
+ private static final Logger LOG = new Logger(GoogleStorageDruidModule.class);
+
+ public static final String SCHEME = "google";
+ private static final String APPLICATION_NAME = "druid-google-extensions";
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ LOG.info("Getting jackson modules...");
+
+ return ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public String getModuleName()
+ {
+ return "Google-" + System.identityHashCode(this);
+ }
+
+ @Override
+ public Version version()
+ {
+ return Version.unknownVersion();
+ }
+
+ @Override
+ public void setupModule(SetupContext context)
+ {
+ context.registerSubtypes(GoogleLoadSpec.class);
+ }
+ },
+ new SimpleModule().registerSubtypes(
+ new NamedType(StaticGoogleBlobStoreFirehoseFactory.class, "static-google-blobstore"))
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ LOG.info("Configuring GoogleStorageDruidModule...");
+
+ JsonConfigProvider.bind(binder, "druid.google", GoogleAccountConfig.class);
+
+ Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPuller.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPusher.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentKiller.class)
+ .in(LazySingleton.class);
+
+ Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
+ JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class);
+ binder.bind(GoogleTaskLogs.class).in(LazySingleton.class);
+ }
+
+ @Provides
+ @LazySingleton
+ public GoogleStorage getGoogleStorage(final GoogleAccountConfig config)
+ throws IOException, GeneralSecurityException
+ {
+ LOG.info("Building Cloud Storage Client...");
+
+ HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+
+ GoogleCredential credential = GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
+ if (credential.createScopedRequired()) {
+ credential = credential.createScoped(StorageScopes.all());
+ }
+ Storage storage = new Storage.Builder(httpTransport, jsonFactory, credential).setApplicationName(APPLICATION_NAME).build();
+
+ return new GoogleStorage(storage);
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java
new file mode 100644
index 000000000000..90cf4738150f
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.api.client.http.InputStreamContent;
+import com.google.common.base.Optional;
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.tasklogs.TaskLogs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+public class GoogleTaskLogs implements TaskLogs {
+ private static final Logger LOG = new Logger(GoogleTaskLogs.class);
+
+ private final GoogleTaskLogsConfig config;
+ private final GoogleStorage storage;
+
+ @Inject
+ public GoogleTaskLogs(GoogleTaskLogsConfig config, GoogleStorage storage) {
+ this.config = config;
+ this.storage = storage;
+ }
+
+ @Override
+ public void pushTaskLog(final String taskid, final File logFile) throws IOException {
+ final String taskKey = getTaskLogKey(taskid);
+ LOG.info("Pushing task log %s to: %s", logFile, taskKey);
+
+ FileInputStream fileSteam = new FileInputStream(logFile);
+
+ InputStreamContent mediaContent = new InputStreamContent("text/plain", fileSteam);
+ mediaContent.setLength(logFile.length());
+
+ storage.insert(config.getBucket(), taskKey, mediaContent);
+ }
+
+ @Override
+ public Optional streamTaskLog(final String taskid, final long offset) throws IOException {
+ final String taskKey = getTaskLogKey(taskid);
+
+ try {
+ if (!storage.exists(config.getBucket(), taskKey)) {
+ return Optional.absent();
+ }
+
+ final long length = storage.size(config.getBucket(), taskKey);
+
+ return Optional.of(
+ new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ try {
+ final long start;
+
+ if (offset > 0 && offset < length) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) < length) {
+ start = length + offset;
+ } else {
+ start = 0;
+ }
+
+ InputStream stream = new GoogleByteSource(storage, config.getBucket(), taskKey).openStream();
+ stream.skip(start);
+
+ return stream;
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new IOException(String.format("Failed to stream logs from: %s", taskKey), e);
+ }
+ }
+
+ private String getTaskLogKey(String taskid) {
+ return config.getPrefix() + "/" + taskid.replaceAll(":", "_");
+ }
+
+ @Override
+ public void killAll() throws IOException
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogsConfig.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogsConfig.java
new file mode 100644
index 000000000000..516b6d65c727
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogsConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.validation.constraints.NotNull;
+
+public class GoogleTaskLogsConfig {
+ @JsonProperty
+ @NotNull
+ private final String bucket;
+
+ @JsonProperty
+ @NotNull
+ private final String prefix;
+
+ public GoogleTaskLogsConfig(@JsonProperty("bucket") String bucket, @JsonProperty("prefix") String prefix) {
+ this.bucket = bucket;
+ this.prefix = prefix;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleUtils.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleUtils.java
new file mode 100644
index 000000000000..d2edd133e1d2
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.common.base.Predicate;
+
+public class GoogleUtils
+{
+ public static final Predicate GOOGLE_RETRY = new Predicate()
+ {
+ @Override
+ public boolean apply(Throwable e)
+ {
+ return false;
+ }
+ };
+}
diff --git a/extensions-contrib/google-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/google-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..c296c902d3ba
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1,20 @@
+#
+# Licensed to Metamarkets Group Inc. (Metamarkets) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. Metamarkets licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+io.druid.storage.google.GoogleStorageDruidModule
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleByteSourceTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleByteSourceTest.java
new file mode 100644
index 000000000000..afb93e3d02ba
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleByteSourceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import org.easymock.EasyMockSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.easymock.EasyMock.expect;
+
+public class GoogleByteSourceTest extends EasyMockSupport
+{
+ @Test
+ public void openStreamTest() throws IOException
+ {
+ final String bucket = "bucket";
+ final String path = "/path/to/file";
+ GoogleStorage storage = createMock(GoogleStorage.class);
+ InputStream stream = createMock(InputStream.class);
+
+ expect(storage.get(bucket, path)).andReturn(stream);
+
+ replayAll();
+
+ GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
+
+ byteSource.openStream();
+
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void openStreamWithRecoverableErrorTest() throws IOException
+ {
+ final String bucket = "bucket";
+ final String path = "/path/to/file";
+ GoogleStorage storage = createMock(GoogleStorage.class);
+
+ expect(storage.get(bucket, path)).andThrow(new IOException(""));
+
+ replayAll();
+
+ GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
+
+ byteSource.openStream();
+
+ verifyAll();
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentKillerTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentKillerTest.java
new file mode 100644
index 000000000000..31dac78e5f34
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentKillerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.common.collect.ImmutableMap;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMockSupport;
+import org.joda.time.Interval;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.easymock.EasyMock.expectLastCall;
+
+public class GoogleDataSegmentKillerTest extends EasyMockSupport
+{
+ private static final String bucket = "bucket";
+ private static final String indexPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
+
+ private static final DataSegment dataSegment = new DataSegment(
+ "test",
+ new Interval("2015-04-12/2015-04-13"),
+ "1",
+ ImmutableMap.of("bucket", bucket, "path", indexPath),
+ null,
+ null,
+ new NoneShardSpec(),
+ 0,
+ 1
+ );
+
+ private GoogleStorage storage;
+
+ @Before
+ public void before()
+ {
+ storage = createMock(GoogleStorage.class);
+ }
+
+ @Test
+ public void killTest() throws SegmentLoadingException, IOException
+ {
+ final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
+
+ storage.delete(bucket, indexPath);
+ expectLastCall();
+ storage.delete(bucket, descriptorPath);
+ expectLastCall();
+
+ replayAll();
+
+ GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
+
+ killer.kill(dataSegment);
+
+ verifyAll();
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void killWithErrorTest() throws SegmentLoadingException, IOException
+ {
+ storage.delete(bucket, indexPath);
+ expectLastCall().andThrow(new IOException(""));
+
+ replayAll();
+
+ GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
+
+ killer.kill(dataSegment);
+
+ verifyAll();
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java
new file mode 100644
index 000000000000..cfc4b7850619
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.common.collect.ImmutableMap;
+import io.druid.java.util.common.FileUtils;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMockSupport;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GoogleDataSegmentPullerTest extends EasyMockSupport
+{
+ private static final String bucket = "bucket";
+ private static final String path = "/path/to/storage/index.zip";
+ private static final DataSegment dataSegment = new DataSegment(
+ "test",
+ new Interval("2015-04-12/2015-04-13"),
+ "1",
+ ImmutableMap.of("bucket", bucket, "path", path),
+ null,
+ null,
+ NoneShardSpec.instance(),
+ 0,
+ 1
+ );
+
+ @Test(expected = SegmentLoadingException.class)
+ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
+ throws IOException, SegmentLoadingException
+ {
+ final File outDir = Files.createTempDirectory("druid").toFile();
+ outDir.deleteOnExit();
+ GoogleStorage storage = createMock(GoogleStorage.class);
+
+ expect(storage.get(bucket, path)).andThrow(new IOException(""));
+
+ replayAll();
+
+ GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
+ puller.getSegmentFiles(bucket, path, outDir);
+
+ assertFalse(outDir.exists());
+
+ verifyAll();
+ }
+
+ @Test
+ public void getSegmentFilesTest() throws SegmentLoadingException
+ {
+ final File outDir = new File("");
+ final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class);
+ GoogleStorage storage = createMock(GoogleStorage.class);
+ GoogleDataSegmentPuller puller = createMockBuilder(GoogleDataSegmentPuller.class).withConstructor(
+ storage
+ ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock();
+
+ expect(puller.getSegmentFiles(bucket, path, outDir)).andReturn(result);
+
+ replayAll();
+
+ puller.getSegmentFiles(dataSegment, outDir);
+
+ verifyAll();
+ }
+
+ @Test
+ public void prepareOutDirTest() throws IOException
+ {
+ GoogleStorage storage = createMock(GoogleStorage.class);
+ File outDir = Files.createTempDirectory("druid").toFile();
+
+ try {
+ GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
+ puller.prepareOutDir(outDir);
+
+ assertTrue(outDir.exists());
+ }
+ finally {
+ outDir.delete();
+ }
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java
new file mode 100644
index 000000000000..862974c513ba
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.segment.loading.DataSegmentPusherUtil;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.easymock.EasyMock.expectLastCall;
+
+public class GoogleDataSegmentPusherTest extends EasyMockSupport
+{
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String bucket = "bucket";
+ private static final String prefix = "prefix";
+ private static final String path = "prefix/test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
+ private static final DataSegment dataSegment = new DataSegment(
+ "test",
+ new Interval("2015-04-12/2015-04-13"),
+ "1",
+ ImmutableMap.of("bucket", bucket, "path", path),
+ null,
+ null,
+ new NoneShardSpec(),
+ 0,
+ 1
+ );
+
+ private GoogleStorage storage;
+ private GoogleAccountConfig googleAccountConfig;
+ private ObjectMapper jsonMapper;
+
+ @Before
+ public void before()
+ {
+ storage = createMock(GoogleStorage.class);
+ googleAccountConfig = new GoogleAccountConfig();
+ googleAccountConfig.setBucket(bucket);
+ googleAccountConfig.setPrefix(prefix);
+
+ jsonMapper = new DefaultObjectMapper();
+ }
+
+ @Test
+ public void testPush() throws Exception
+ {
+ // Create a mock segment on disk
+ File tmp = tempFolder.newFile("version.bin");
+
+ final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
+ Files.write(data, tmp);
+ final long size = data.length;
+
+ DataSegment segmentToPush = new DataSegment(
+ "foo",
+ new Interval("2015/2016"),
+ "0",
+ Maps.newHashMap(),
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ size
+ );
+
+ GoogleDataSegmentPusher pusher = createMockBuilder(
+ GoogleDataSegmentPusher.class
+ ).withConstructor(
+ storage,
+ googleAccountConfig,
+ jsonMapper
+ ).addMockedMethod("insert", File.class, String.class, String.class).createMock();
+
+ final String storageDir = DataSegmentPusherUtil.getStorageDir(segmentToPush);
+ final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
+ final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";
+
+ pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), EasyMock.eq(indexPath));
+ expectLastCall();
+ pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/json"), EasyMock.eq(descriptorPath));
+ expectLastCall();
+
+ replayAll();
+
+ DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
+
+ Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
+ Assert.assertEquals(segmentToPush, segment);
+ Assert.assertEquals(ImmutableMap.of(
+ "type",
+ GoogleStorageDruidModule.SCHEME,
+ "bucket",
+ bucket,
+ "path",
+ indexPath
+ ), segment.getLoadSpec());
+
+ verifyAll();
+ }
+}
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleTaskLogsTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleTaskLogsTest.java
new file mode 100644
index 000000000000..e21e1b6d6b6d
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleTaskLogsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.storage.google;
+
+import com.google.api.client.http.InputStreamContent;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.io.ByteSource;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.StringWriter;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class GoogleTaskLogsTest extends EasyMockSupport {
+ private static final String bucket = "test";
+ private static final String prefix = "test/log";
+ private static final String taskid = "taskid";
+
+ private GoogleStorage storage;
+ private GoogleTaskLogs googleTaskLogs;
+
+ @Before
+ public void before() {
+ storage = createMock(GoogleStorage.class);
+ GoogleTaskLogsConfig config = new GoogleTaskLogsConfig(bucket, prefix);
+ googleTaskLogs = new GoogleTaskLogs(config, storage);
+ }
+
+ @Test
+ public void testPushTaskLog() throws Exception {
+ final File tmpDir = Files.createTempDir();
+
+ try {
+ final File logFile = new File(tmpDir, "log");
+ BufferedWriter output = new BufferedWriter(new FileWriter(logFile));
+ output.write("test");
+ output.close();
+
+ storage.insert(EasyMock.eq(bucket), EasyMock.eq(prefix + "/" + taskid), EasyMock.anyObject(InputStreamContent.class));
+ expectLastCall();
+
+ replayAll();
+
+ googleTaskLogs.pushTaskLog(taskid, logFile);
+
+ verifyAll();
+ } finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
+ @Test
+ public void testStreamTaskLogWithoutOffset() throws Exception {
+ final String testLog = "hello this is a log";
+
+ final String logPath = prefix + "/" + taskid;
+ expect(storage.exists(bucket, logPath)).andReturn(true);
+ expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
+ expect(storage.get(bucket, logPath)).andReturn(
+ new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
+ );
+
+ replayAll();
+
+ final Optional byteSource = googleTaskLogs.streamTaskLog(taskid, 0);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), testLog);
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStreamTaskLogWithPositiveOffset() throws Exception {
+ final String testLog = "hello this is a log";
+
+ final String logPath = prefix + "/" + taskid;
+ expect(storage.exists(bucket, logPath)).andReturn(true);
+ expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
+ expect(storage.get(bucket, logPath)).andReturn(
+ new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
+ );
+
+ replayAll();
+
+ final Optional byteSource = googleTaskLogs.streamTaskLog(taskid, 5);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), testLog.substring(5));
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStreamTaskLogWithNegative() throws Exception {
+ final String testLog = "hello this is a log";
+
+ final String logPath = prefix + "/" + taskid;
+ expect(storage.exists(bucket, logPath)).andReturn(true);
+ expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
+ expect(storage.get(bucket, logPath)).andReturn(
+ new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
+ );
+
+ replayAll();
+
+ final Optional byteSource = googleTaskLogs.streamTaskLog(taskid, -3);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
+
+ verifyAll();
+ }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
index d24e907f8b50..f71ea2390200 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
@@ -420,12 +420,18 @@ public long push() throws IOException
switch (outputFS.getScheme()) {
case "hdfs":
case "viewfs":
- case "gs":
loadSpec = ImmutableMap.of(
"type", "hdfs",
"path", indexOutURI.toString()
);
break;
+ case "gs":
+ loadSpec = ImmutableMap.of(
+ "type", "google",
+ "bucket", indexOutURI.getHost(),
+ "path", indexOutURI.getPath().substring(1) // remove the leading "/"
+ );
+ break;
case "s3":
case "s3n":
loadSpec = ImmutableMap.of(
@@ -730,6 +736,8 @@ public static URI getURIFromSegment(DataSegment dataSegment)
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
+ } else if ("google".equals(type)) {
+ segmentLocURI = URI.create(String.format("gs://%s/%s", loadSpec.get("bucket"), loadSpec.get("path")));
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
diff --git a/pom.xml b/pom.xml
index 15dc4fcb6c43..132f59251a06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
extensions-contrib/statsd-emitter
extensions-contrib/orc-extensions
extensions-contrib/time-min-max
+ extensions-contrib/google-extensions
@@ -632,12 +633,12 @@
google-http-client
- org.apache.httpcomponents
- httpclient
+ com.google.http-client
+ google-http-client-jackson2
- org.apache.httpcomponents
- httpcore
+ com.fasterxml.jackson.core
+ jackson-databind