diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/testing/FakeStorageRpc.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/testing/FakeStorageRpc.java index 07cb169511a4..dff6b6ca8395 100644 --- a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/testing/FakeStorageRpc.java +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/testing/FakeStorageRpc.java @@ -29,6 +29,7 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.spi.v1.RpcBatch; import com.google.cloud.storage.spi.v1.StorageRpc; +import com.google.common.io.CountingOutputStream; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; @@ -295,6 +296,12 @@ public Tuple read( return Tuple.of("etag-goes-here", ret); } + @Override + public boolean readToOutputStream( + StorageObject from, CountingOutputStream to, Map options) { + throw new UnsupportedOperationException(); + } + @Override public String open(StorageObject object, Map options) throws StorageException { String key = fullname(object); diff --git a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java index da7e27a72851..003a52fc2379 100644 --- a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java +++ b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java @@ -16,14 +16,17 @@ package com.google.cloud.storage; +import static com.google.cloud.RetryHelper.runWithRetries; import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions; import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.callable; import com.google.api.services.storage.model.StorageObject; import com.google.auth.ServiceAccountSigner; import com.google.auth.ServiceAccountSigner.SigningException; import com.google.cloud.ReadChannel; +import com.google.cloud.RetryHelper; import com.google.cloud.Tuple; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Entity; @@ -34,6 +37,7 @@ import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.Function; import com.google.common.io.BaseEncoding; +import com.google.common.io.CountingOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.OutputStream; @@ -225,6 +229,38 @@ public void downloadTo(Path path, BlobSourceOption... options) { } } + /** + * Downloads this blob to the given output stream path using specified blob read options. + * + * @param outputStream destination + * @param options blob read options + * @throws StorageException upon failure + */ + public void downloadTo(OutputStream outputStream, final BlobSourceOption... options) + throws StorageException { + try (CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream)) { + final StorageRpc storageRpc = this.options.getStorageRpcV1(); + runWithRetries( + callable( + new Runnable() { + @Override + public void run() { + storageRpc.readToOutputStream( + getBlobId().toPb(), + countingOutputStream, + StorageImpl.optionMap(getBlobId(), options)); + } + }), + this.options.getRetrySettings(), + StorageImpl.EXCEPTION_HANDLER, + this.options.getClock()); + } catch (RetryHelper.RetryHelperException e) { + throw StorageException.translateAndThrow(e); + } catch (IOException e) { + throw new StorageException(e); + } + } + /** * Downloads this blob to the given file path. * @@ -238,6 +274,16 @@ public void downloadTo(Path path) { downloadTo(path, new BlobSourceOption[0]); } + /** + * Downloads this blob to the given output stream. + * + * @param outputStream destination + * @throws StorageException upon failure + */ + public void downloadTo(OutputStream outputStream) { + downloadTo(outputStream, new BlobSourceOption[0]); + } + /** Builder for {@code Blob}. */ public static class Builder extends BlobInfo.Builder { diff --git a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 072b65227be8..51790f102b56 100644 --- a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -66,6 +66,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; +import com.google.common.io.CountingOutputStream; import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Span; @@ -642,6 +643,38 @@ public RpcBatch createBatch() { return new DefaultRpcBatch(storage); } + @Override + public boolean readToOutputStream( + StorageObject from, CountingOutputStream to, Map options) { + Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ); + Scope scope = tracer.withSpan(span); + try { + Get req = + storage + .objects() + .get(from.getBucket(), from.getName()) + .setGeneration(from.getGeneration()) + .setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(options)) + .setIfMetagenerationNotMatch(Option.IF_METAGENERATION_NOT_MATCH.getLong(options)) + .setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(options)) + .setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options)) + .setUserProject(Option.USER_PROJECT.getString(options)); + req.getMediaHttpDownloader().setDirectDownloadEnabled(true); + long bytesDownloaded = to.getCount(); + if (bytesDownloaded > 0) { + req.getMediaHttpDownloader().setBytesDownloaded(bytesDownloaded); + } + req.executeMediaAndDownloadTo(to); + return true; + } catch (IOException ex) { + span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage())); + throw translate(ex); + } finally { + scope.close(); + span.end(); + } + } + @Override public Tuple read( StorageObject from, Map options, long position, int bytes) { diff --git a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index eecf311a7033..c54cd336d59c 100644 --- a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -28,6 +28,7 @@ import com.google.cloud.ServiceRpc; import com.google.cloud.Tuple; import com.google.cloud.storage.StorageException; +import com.google.common.io.CountingOutputStream; import java.io.InputStream; import java.util.List; import java.util.Map; @@ -282,6 +283,13 @@ StorageObject compose( */ Tuple read(StorageObject from, Map options, long position, int bytes); + /** + * Reads from a storage object at the given position directly to output stream. + * + * @throws StorageException upon failure + */ + boolean readToOutputStream(StorageObject from, CountingOutputStream to, Map options); + /** * Opens a resumable upload channel for a given storage object. * diff --git a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java index d68bf965f1f4..d7dd066d783b 100644 --- a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java +++ b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java @@ -32,7 +32,11 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.google.api.core.ApiClock; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.services.storage.model.StorageObject; import com.google.cloud.ReadChannel; import com.google.cloud.storage.Acl.Project; import com.google.cloud.storage.Acl.Project.ProjectRole; @@ -41,10 +45,14 @@ import com.google.cloud.storage.Blob.BlobSourceOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.CopyRequest; +import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.BaseEncoding; +import com.google.common.io.CountingOutputStream; import java.io.File; +import java.io.IOException; +import java.io.OutputStream; import java.net.URL; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -58,6 +66,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.threeten.bp.Duration; public class BlobTest { @@ -130,6 +139,28 @@ public class BlobTest { private static final String BASE64_KEY = "JVzfVl8NLD9FjedFuStegjRfES5ll5zc59CIXw572OA="; private static final Key KEY = new SecretKeySpec(BaseEncoding.base64().decode(BASE64_KEY), "AES256"); + private static final RetrySettings RETRY_SETTINGS = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(100L)) + .setRetryDelayMultiplier(1.3) + .setMaxRetryDelay(Duration.ofMillis(60000L)) + .setInitialRpcTimeout(Duration.ofMillis(20000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(20000L)) + .setTotalTimeout(Duration.ofMillis(600000L)) + .build(); + private static final ApiClock API_CLOCK = + new ApiClock() { + @Override + public long nanoTime() { + return 42_000_000_000L; + } + + @Override + public long millisTime() { + return 42_000L; + } + }; private Storage storage; private Blob blob; @@ -593,4 +624,98 @@ public Integer answer() throws Throwable { byte actual[] = Files.readAllBytes(file.toPath()); assertArrayEquals(expected, actual); } + + @Test + public void testDownloadToOutputStream() throws Exception { + final byte[] expected = {1, 2}; + File file = File.createTempFile("blob", ".tmp"); + StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); + expect(storage.getOptions()).andReturn(mockOptions).times(2); + replay(storage); + expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); + expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); + expect(mockOptions.getClock()).andReturn(API_CLOCK); + replay(mockOptions); + storage.getOptions(); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + expect( + mockStorageRpc.readToOutputStream( + anyObject(StorageObject.class), + anyObject(CountingOutputStream.class), + anyObject(Map.class))) + .andAnswer( + new IAnswer() { + @Override + public Boolean answer() throws Throwable { + ((CountingOutputStream) getCurrentArguments()[1]).write(expected); + return true; + } + }); + replay(mockStorageRpc); + OutputStream outputStream = Files.newOutputStream(file.toPath()); + blob.downloadTo(outputStream); + byte actual[] = Files.readAllBytes(file.toPath()); + assertArrayEquals(expected, actual); + } + + @Test + public void testDownloadToOutputStreamWithOptions() throws Exception { + final byte[] expected = {1, 2}; + File file = File.createTempFile("blob", ".tmp"); + StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); + expect(storage.getOptions()).andReturn(mockOptions).times(2); + replay(storage); + expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); + expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); + expect(mockOptions.getClock()).andReturn(API_CLOCK); + replay(mockOptions); + storage.getOptions(); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + expect( + mockStorageRpc.readToOutputStream( + anyObject(StorageObject.class), + anyObject(CountingOutputStream.class), + anyObject(Map.class))) + .andAnswer( + new IAnswer() { + @Override + public Boolean answer() throws Throwable { + ((CountingOutputStream) getCurrentArguments()[1]).write(expected); + return true; + } + }); + replay(mockStorageRpc); + OutputStream outputStream = Files.newOutputStream(file.toPath()); + blob.downloadTo(outputStream, new BlobSourceOption[0]); + byte actual[] = Files.readAllBytes(file.toPath()); + assertArrayEquals(expected, actual); + } + + @Test + public void testDownloadToOutputStreamStorageException() throws Exception { + File file = File.createTempFile("blob", ".tmp"); + StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); + expect(storage.getOptions()).andReturn(mockOptions).times(2); + replay(storage); + expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); + expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); + expect(mockOptions.getClock()).andReturn(API_CLOCK); + replay(mockOptions); + storage.getOptions(); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + expect( + mockStorageRpc.readToOutputStream( + anyObject(StorageObject.class), + anyObject(CountingOutputStream.class), + anyObject(Map.class))) + .andThrow(new StorageException(new IOException())); + replay(mockStorageRpc); + OutputStream outputStream = Files.newOutputStream(file.toPath()); + try { + blob.downloadTo(outputStream); + fail(); + } catch (StorageException e) { + // expected + } + } } diff --git a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java index 0d77547e7605..03a11e4ab1fc 100644 --- a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java +++ b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.testing.ApiPolicyMatcher.eqApiPolicy; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.easymock.EasyMock.getCurrentArguments; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -25,6 +26,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.core.ApiClock; @@ -55,9 +57,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.io.BaseEncoding; +import com.google.common.io.CountingOutputStream; import com.google.common.net.UrlEscapers; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLDecoder; @@ -81,6 +86,7 @@ import javax.crypto.spec.SecretKeySpec; import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -1591,6 +1597,60 @@ public void testReaderWithOptionsFromBlobId() throws IOException { channel.read(ByteBuffer.allocate(42)); } + @Test + public void testReadToOutputStream() { + final byte[] expected = {0xD, 0xE, 0xA, 0xD}; + OutputStream outputStream = new ByteArrayOutputStream(); + CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + EasyMock.expect( + storageRpcMock.readToOutputStream( + BlobId.of(BUCKET_NAME1, BLOB_NAME1).toPb(), + countingOutputStream, + EMPTY_RPC_OPTIONS)) + .andAnswer( + new IAnswer() { + @Override + public Boolean answer() throws Throwable { + ((CountingOutputStream) getCurrentArguments()[1]).write(expected); + return true; + } + }); + EasyMock.replay(storageRpcMock); + initializeService(); + boolean result = + options + .getStorageRpcV1() + .readToOutputStream( + BlobId.of(BUCKET_NAME1, BLOB_NAME1).toPb(), + countingOutputStream, + EMPTY_RPC_OPTIONS); + assertTrue(result); + assertArrayEquals(BLOB_CONTENT, ((ByteArrayOutputStream) outputStream).toByteArray()); + } + + @Test + public void testReadToOutputStreamStorageException() { + CountingOutputStream countingOutputStream = + new CountingOutputStream(new ByteArrayOutputStream()); + EasyMock.expect( + storageRpcMock.readToOutputStream( + BlobId.of(BUCKET_NAME1, BLOB_NAME1).toPb(), + countingOutputStream, + EMPTY_RPC_OPTIONS)) + .andThrow(new StorageException(new IOException())); + EasyMock.replay(storageRpcMock); + initializeService(); + try { + options + .getStorageRpcV1() + .readToOutputStream( + BlobId.of(BUCKET_NAME1, BLOB_NAME1).toPb(), countingOutputStream, EMPTY_RPC_OPTIONS); + fail(); + } catch (StorageException e) { + // expected + } + } + @Test public void testWriter() { BlobInfo.Builder infoBuilder = BLOB_INFO1.toBuilder();