From b01c28330b6188dc95b8b137dbb3443d5da83d38 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 21 Nov 2019 05:31:21 -0800 Subject: [PATCH] use retry-able stream for google input source --- .../impl/prefetch/RetryingInputStream.java | 4 +- .../google/GoogleCloudStorageEntity.java | 40 +++++++++++++++---- .../druid/storage/google/GoogleUtils.java | 5 ++- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java index af401e9eb12e..e7b802348a6c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java @@ -35,7 +35,7 @@ * * @param object type */ -class RetryingInputStream extends InputStream +public class RetryingInputStream extends InputStream { private static final Logger log = new Logger(RetryingInputStream.class); @@ -47,7 +47,7 @@ class RetryingInputStream extends InputStream private CountingInputStream delegate; private long startOffset; - RetryingInputStream( + public RetryingInputStream( T object, ObjectOpenFunction objectOpenFunction, Predicate retryCondition, diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 5a3256eb374f..2cee0bbb29cb 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -19,8 +19,11 @@ package org.apache.druid.data.input.google; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.data.input.impl.prefetch.RetryingInputStream; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; @@ -33,13 +36,17 @@ public class GoogleCloudStorageEntity implements InputEntity { - private final GoogleStorage storage; + private final GoogleByteSourceOpenFunction googleByteSourceOpenFunction; + private final GoogleByteSource byteSource; private final URI uri; GoogleCloudStorageEntity(GoogleStorage storage, URI uri) { - this.storage = storage; - this.uri = uri; + this.googleByteSourceOpenFunction = new GoogleByteSourceOpenFunction(); + this.uri = Preconditions.checkNotNull(uri); + final String bucket = uri.getAuthority(); + final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); + this.byteSource = new GoogleByteSource(storage, bucket, key); } @Nullable @@ -52,11 +59,13 @@ public URI getUri() @Override public InputStream open() throws IOException { - // Get data of the given object and open an input stream - final String bucket = uri.getAuthority(); - final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); - final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); - return CompressionUtils.decompress(byteSource.openStream(), uri.getPath()); + RetryingInputStream retryingStream = new RetryingInputStream<>( + byteSource, + googleByteSourceOpenFunction, + GoogleUtils.GOOGLE_RETRY, + GoogleUtils.MAX_BYTESOURCE_RETRIES + ); + return CompressionUtils.decompress(retryingStream, uri.getPath()); } @Override @@ -64,4 +73,19 @@ public Predicate getFetchRetryCondition() { return GoogleUtils.GOOGLE_RETRY; } + + private static class GoogleByteSourceOpenFunction implements ObjectOpenFunction + { + @Override + public InputStream open(GoogleByteSource object) throws IOException + { + return open(object, 0L); + } + + @Override + public InputStream open(GoogleByteSource object, long start) throws IOException + { + return object.openStream(start); + } + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 1dd12ef2388e..e87a97dbe1f4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -27,6 +27,9 @@ public class GoogleUtils { + public static final int MAX_BYTESOURCE_RETRIES = 10; + public static final Predicate GOOGLE_RETRY = GoogleUtils::isRetryable; + public static boolean isRetryable(Throwable t) { if (t instanceof HttpResponseException) { @@ -40,6 +43,4 @@ public static String extractGoogleCloudStorageObjectKey(URI uri) { return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); } - - public static final Predicate GOOGLE_RETRY = e -> isRetryable(e); }