Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.spi.v1.RpcBatch;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
Expand Down Expand Up @@ -295,6 +296,12 @@ public Tuple<String, byte[]> read(
return Tuple.of("etag-goes-here", ret);
}

@Override
public boolean readToOutputStream(
StorageObject from, CountingOutputStream to, Map<Option, ?> options) {
throw new UnsupportedOperationException();
}

@Override
public String open(StorageObject object, Map<Option, ?> options) throws StorageException {
String key = fullname(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions;
import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
Expand All @@ -34,6 +37,7 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Function;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -225,6 +229,38 @@ public void downloadTo(Path path, BlobSourceOption... options) {
}
}

/**
* Downloads this blob to the given output stream path using specified blob read options.
*
* @param outputStream destination
* @param options blob read options
* @throws StorageException upon failure
*/
public void downloadTo(OutputStream outputStream, final BlobSourceOption... options)
throws StorageException {
try (CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream)) {
final StorageRpc storageRpc = this.options.getStorageRpcV1();
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
storageRpc.readToOutputStream(
getBlobId().toPb(),
countingOutputStream,
StorageImpl.optionMap(getBlobId(), options));
}
}),
this.options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
this.options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
} catch (IOException e) {
throw new StorageException(e);
}
}
Copy link
Contributor

@frankyn frankyn Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrey-qlogic, I appreciate your patience.

I'd recommend passing through an additional option (USE_DIRECT_DOWNLOAD) to downloadTo() if using getMediaHttpDownloader is considered a breaking change:

Then in the underlying RPC class handle the request in
HttpStorageRpc.getCall() which called by HttpStorageRpc.get().

WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added USE_DIRECT_DOWNLOAD to set directDownloadEnabled for MediaHttpDownloader


/**
* Downloads this blob to the given file path.
*
Expand All @@ -238,6 +274,16 @@ public void downloadTo(Path path) {
downloadTo(path, new BlobSourceOption[0]);
}

/**
* Downloads this blob to the given output stream.
*
* @param outputStream destination
* @throws StorageException upon failure
*/
public void downloadTo(OutputStream outputStream) {
downloadTo(outputStream, new BlobSourceOption[0]);
}

/** Builder for {@code Blob}. */
public static class Builder extends BlobInfo.Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
Expand Down Expand Up @@ -642,6 +643,38 @@ public RpcBatch createBatch() {
return new DefaultRpcBatch(storage);
}

@Override
public boolean readToOutputStream(
StorageObject from, CountingOutputStream to, Map<Option, ?> options) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
Get req =
storage
.objects()
.get(from.getBucket(), from.getName())
.setGeneration(from.getGeneration())
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(options))
.setIfMetagenerationNotMatch(Option.IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);
long bytesDownloaded = to.getCount();
if (bytesDownloaded > 0) {
req.getMediaHttpDownloader().setBytesDownloaded(bytesDownloaded);
}
req.executeMediaAndDownloadTo(to);
return true;
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end();
}
}

@Override
public Tuple<String, byte[]> read(
StorageObject from, Map<Option, ?> options, long position, int bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.ServiceRpc;
import com.google.cloud.Tuple;
import com.google.cloud.storage.StorageException;
import com.google.common.io.CountingOutputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -282,6 +283,13 @@ StorageObject compose(
*/
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes);

/**
* Reads from a storage object at the given position directly to output stream.
*
* @throws StorageException upon failure
*/
boolean readToOutputStream(StorageObject from, CountingOutputStream to, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given storage object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Acl.Project;
import com.google.cloud.storage.Acl.Project.ProjectRole;
Expand All @@ -41,10 +45,14 @@
import com.google.cloud.storage.Blob.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
Expand All @@ -58,6 +66,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.threeten.bp.Duration;

public class BlobTest {

Expand Down Expand Up @@ -130,6 +139,28 @@ public class BlobTest {
private static final String BASE64_KEY = "JVzfVl8NLD9FjedFuStegjRfES5ll5zc59CIXw572OA=";
private static final Key KEY =
new SecretKeySpec(BaseEncoding.base64().decode(BASE64_KEY), "AES256");
private static final RetrySettings RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100L))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofMillis(60000L))
.setInitialRpcTimeout(Duration.ofMillis(20000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(20000L))
.setTotalTimeout(Duration.ofMillis(600000L))
.build();
private static final ApiClock API_CLOCK =
new ApiClock() {
@Override
public long nanoTime() {
return 42_000_000_000L;
}

@Override
public long millisTime() {
return 42_000L;
}
};

private Storage storage;
private Blob blob;
Expand Down Expand Up @@ -593,4 +624,98 @@ public Integer answer() throws Throwable {
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

@Test
public void testDownloadToOutputStream() throws Exception {
final byte[] expected = {1, 2};
File file = File.createTempFile("blob", ".tmp");
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(2);
replay(storage);
expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc);
expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS);
expect(mockOptions.getClock()).andReturn(API_CLOCK);
replay(mockOptions);
storage.getOptions();
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
expect(
mockStorageRpc.readToOutputStream(
anyObject(StorageObject.class),
anyObject(CountingOutputStream.class),
anyObject(Map.class)))
.andAnswer(
new IAnswer<Boolean>() {
@Override
public Boolean answer() throws Throwable {
((CountingOutputStream) getCurrentArguments()[1]).write(expected);
return true;
}
});
replay(mockStorageRpc);
OutputStream outputStream = Files.newOutputStream(file.toPath());
blob.downloadTo(outputStream);
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

@Test
public void testDownloadToOutputStreamWithOptions() throws Exception {
final byte[] expected = {1, 2};
File file = File.createTempFile("blob", ".tmp");
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(2);
replay(storage);
expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc);
expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS);
expect(mockOptions.getClock()).andReturn(API_CLOCK);
replay(mockOptions);
storage.getOptions();
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
expect(
mockStorageRpc.readToOutputStream(
anyObject(StorageObject.class),
anyObject(CountingOutputStream.class),
anyObject(Map.class)))
.andAnswer(
new IAnswer<Boolean>() {
@Override
public Boolean answer() throws Throwable {
((CountingOutputStream) getCurrentArguments()[1]).write(expected);
return true;
}
});
replay(mockStorageRpc);
OutputStream outputStream = Files.newOutputStream(file.toPath());
blob.downloadTo(outputStream, new BlobSourceOption[0]);
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

@Test
public void testDownloadToOutputStreamStorageException() throws Exception {
File file = File.createTempFile("blob", ".tmp");
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(2);
replay(storage);
expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc);
expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS);
expect(mockOptions.getClock()).andReturn(API_CLOCK);
replay(mockOptions);
storage.getOptions();
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
expect(
mockStorageRpc.readToOutputStream(
anyObject(StorageObject.class),
anyObject(CountingOutputStream.class),
anyObject(Map.class)))
.andThrow(new StorageException(new IOException()));
replay(mockStorageRpc);
OutputStream outputStream = Files.newOutputStream(file.toPath());
try {
blob.downloadTo(outputStream);
fail();
} catch (StorageException e) {
// expected
}
}
}
Loading