Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @param <T> object type
*/
class RetryingInputStream<T> extends InputStream
public class RetryingInputStream<T> extends InputStream
{
private static final Logger log = new Logger(RetryingInputStream.class);

Expand All @@ -47,7 +47,7 @@ class RetryingInputStream<T> extends InputStream
private CountingInputStream delegate;
private long startOffset;

RetryingInputStream(
public RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
Predicate<Throwable> retryCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.apache.druid.data.input.google;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.data.input.impl.prefetch.RetryingInputStream;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
Expand All @@ -33,13 +36,17 @@

public class GoogleCloudStorageEntity implements InputEntity
{
private final GoogleStorage storage;
private final GoogleByteSourceOpenFunction googleByteSourceOpenFunction;
private final GoogleByteSource byteSource;
private final URI uri;

GoogleCloudStorageEntity(GoogleStorage storage, URI uri)
{
this.storage = storage;
this.uri = uri;
this.googleByteSourceOpenFunction = new GoogleByteSourceOpenFunction();
this.uri = Preconditions.checkNotNull(uri);
final String bucket = uri.getAuthority();
final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri);
this.byteSource = new GoogleByteSource(storage, bucket, key);
}

@Nullable
Expand All @@ -52,16 +59,33 @@ public URI getUri()
@Override
public InputStream open() throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri);
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return CompressionUtils.decompress(byteSource.openStream(), uri.getPath());
RetryingInputStream<GoogleByteSource> retryingStream = new RetryingInputStream<>(
byteSource,
googleByteSourceOpenFunction,
GoogleUtils.GOOGLE_RETRY,
GoogleUtils.MAX_BYTESOURCE_RETRIES
);
return CompressionUtils.decompress(retryingStream, uri.getPath());
}

@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return GoogleUtils.GOOGLE_RETRY;
}

private static class GoogleByteSourceOpenFunction implements ObjectOpenFunction<GoogleByteSource>
{
@Override
public InputStream open(GoogleByteSource object) throws IOException
{
return open(object, 0L);
}

@Override
public InputStream open(GoogleByteSource object, long start) throws IOException
{
return object.openStream(start);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

public class GoogleUtils
{
public static final int MAX_BYTESOURCE_RETRIES = 10;
public static final Predicate<Throwable> GOOGLE_RETRY = GoogleUtils::isRetryable;

public static boolean isRetryable(Throwable t)
{
if (t instanceof HttpResponseException) {
Expand All @@ -40,6 +43,4 @@ public static String extractGoogleCloudStorageObjectKey(URI uri)
{
return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath();
}

public static final Predicate<Throwable> GOOGLE_RETRY = e -> isRetryable(e);
}