From 67cd16cd5685cdfdcca0d2352b775a8f43798215 Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Tue, 28 Mar 2023 15:47:39 -0400 Subject: [PATCH 1/8] feat: add stream method for ServerStream --- .../com/google/api/gax/rpc/ServerStream.java | 6 +++++ .../google/api/gax/rpc/ServerStreamTest.java | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java index 518c68523c..883a0ed130 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java @@ -31,6 +31,8 @@ import com.google.api.core.InternalApi; import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; /** @@ -89,6 +91,10 @@ public Iterator iterator() { return iterator; } + public Stream stream() { + return StreamSupport.stream(this.spliterator(), false); + } + /** * Returns true if the next call to the iterator's hasNext() or next() is guaranteed to be * nonblocking. diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java index b0cdc30ce1..087c64ca48 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -109,6 +110,31 @@ public List call() { Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4); } + @Test + public void testMultipleItemStreamMethod() throws Exception { + Future producerFuture = + executor.submit( + () -> { + for (int i = 0; i < 5; i++) { + int requestCount = controller.popLastPull(); + + Truth.assertWithMessage("ServerStream should request one item at a time") + .that(requestCount) + .isEqualTo(1); + + stream.observer().onResponse(i); + } + stream.observer().onComplete(); + return null; + }); + Future> consumerFuture = + executor.submit(() -> stream.stream().collect(Collectors.toList())); + + producerFuture.get(60, TimeUnit.SECONDS); + List results = consumerFuture.get(); + Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4); + } + @Test public void testEarlyTermination() throws Exception { Future taskFuture = From 96fdde88f1c2c4d0610ad39d27d8eee18eb832d2 Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Wed, 29 Mar 2023 18:34:31 -0400 Subject: [PATCH 2/8] add integration test --- .../showcase/v1beta1/it/ITServerSideStreaming.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index 107f4f9380..88e8709b69 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -35,6 +35,7 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Iterator; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -79,6 +80,18 @@ public void testGrpc_receiveStreamedContent() { .inOrder(); } + @Test + public void testGrpc_receiveStreamedContentStreamAPI() { + String content = "The rain in Spain stays mainly on the plain!"; + ServerStream responseStream = + grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); + assertThat(responseStream.stream().collect(Collectors.toList())) + .containsExactlyElementsIn( + ImmutableList.of( + "The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!")) + .inOrder(); + } + @Test public void testGrpc_serverError_receiveErrorAfterLastWordInStream() { String content = "The rain in Spain"; From 387931a56ad15e5fb7a7b543d70e72315b4338bc Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Wed, 29 Mar 2023 18:45:06 -0400 Subject: [PATCH 3/8] fix integration test --- .../google/showcase/v1beta1/it/ITServerSideStreaming.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index 88e8709b69..05703a25ab 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -85,7 +85,10 @@ public void testGrpc_receiveStreamedContentStreamAPI() { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); - assertThat(responseStream.stream().collect(Collectors.toList())) + assertThat(responseStream + .stream() + .map(EchoResponse::getContent) + .collect(Collectors.toList())) .containsExactlyElementsIn( ImmutableList.of( "The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!")) From beeccdf4b9e80bf4d36e77e6aaaac1d2d460742c Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Wed, 29 Mar 2023 18:57:35 -0400 Subject: [PATCH 4/8] fix format --- .../google/showcase/v1beta1/it/ITServerSideStreaming.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index 05703a25ab..6208724248 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -85,10 +85,7 @@ public void testGrpc_receiveStreamedContentStreamAPI() { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); - assertThat(responseStream - .stream() - .map(EchoResponse::getContent) - .collect(Collectors.toList())) + assertThat(responseStream.stream().map(EchoResponse::getContent).collect(Collectors.toList())) .containsExactlyElementsIn( ImmutableList.of( "The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!")) From e456efc945ec71db5c514aa5e60121e3c46d7e06 Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Fri, 31 Mar 2023 19:46:57 -0400 Subject: [PATCH 5/8] add javadoc --- .../src/main/java/com/google/api/gax/rpc/ServerStream.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java index 883a0ed130..0ad63331f1 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java @@ -91,6 +91,10 @@ public Iterator iterator() { return iterator; } + /** + * Returns a sequential {@code Stream} with server responses as its source. + * @return a sequential {@code Stream} over the elements in server responses + */ public Stream stream() { return StreamSupport.stream(this.spliterator(), false); } From 680b1c5b6c402565da537021afdd50314186b7be Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Fri, 31 Mar 2023 20:45:55 -0400 Subject: [PATCH 6/8] fix format --- .../gax/src/main/java/com/google/api/gax/rpc/ServerStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java index 0ad63331f1..d1f9848631 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java @@ -93,6 +93,7 @@ public Iterator iterator() { /** * Returns a sequential {@code Stream} with server responses as its source. + * * @return a sequential {@code Stream} over the elements in server responses */ public Stream stream() { From f3ce0643310fd87b0153d073a171f9facee246a7 Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Fri, 31 Mar 2023 20:56:38 -0400 Subject: [PATCH 7/8] add exception --- .../google/showcase/v1beta1/it/ITServerSideStreaming.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index 6208724248..ef2b6ee433 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -64,7 +64,7 @@ public void destroyClient() { } @Test - public void testGrpc_receiveStreamedContent() { + public void testGrpc_receiveStreamedContent() throws GeneralSecurityException { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); @@ -81,7 +81,7 @@ public void testGrpc_receiveStreamedContent() { } @Test - public void testGrpc_receiveStreamedContentStreamAPI() { + public void testGrpc_receiveStreamedContentStreamAPI() throws GeneralSecurityException { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); @@ -93,7 +93,7 @@ public void testGrpc_receiveStreamedContentStreamAPI() { } @Test - public void testGrpc_serverError_receiveErrorAfterLastWordInStream() { + public void testGrpc_serverError_receiveErrorAfterLastWordInStream() throws GeneralSecurityException { String content = "The rain in Spain"; Status cancelledStatus = Status.newBuilder().setCode(StatusCode.Code.CANCELLED.ordinal()).build(); From c3fcd389e43777d0d6051d75a9d1faa9efdcffb4 Mon Sep 17 00:00:00 2001 From: Joe Wang Date: Mon, 3 Apr 2023 10:36:07 -0400 Subject: [PATCH 8/8] remove unused exception in tests --- .../showcase/v1beta1/it/ITServerSideStreaming.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index a2bd14dbe5..9c13f3f8b3 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -32,6 +32,7 @@ import com.google.showcase.v1beta1.ExpandRequest; import io.grpc.ManagedChannelBuilder; import java.io.IOException; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Iterator; import java.util.stream.Collectors; @@ -46,7 +47,7 @@ public class ITServerSideStreaming { private EchoClient httpjsonClient; @Before - public void createClients() throws IOException { + public void createClients() throws IOException, GeneralSecurityException { // Create gRPC Echo Client EchoSettings grpcEchoSettings = EchoSettings.newBuilder() @@ -78,7 +79,7 @@ public void destroyClient() { } @Test - public void testGrpc_receiveStreamedContent() throws GeneralSecurityException { + public void testGrpc_receiveStreamedContent() { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); @@ -95,7 +96,7 @@ public void testGrpc_receiveStreamedContent() throws GeneralSecurityException { } @Test - public void testGrpc_receiveStreamedContentStreamAPI() throws GeneralSecurityException { + public void testGrpc_receiveStreamedContentStreamAPI() { String content = "The rain in Spain stays mainly on the plain!"; ServerStream responseStream = grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); @@ -107,7 +108,7 @@ public void testGrpc_receiveStreamedContentStreamAPI() throws GeneralSecurityExc } @Test - public void testGrpc_serverError_receiveErrorAfterLastWordInStream() throws GeneralSecurityException { + public void testGrpc_serverError_receiveErrorAfterLastWordInStream() { String content = "The rain in Spain"; Status cancelledStatus = Status.newBuilder().setCode(StatusCode.Code.CANCELLED.ordinal()).build();