From 8c8122218d5091272ea916d7f2fa477f17ced241 Mon Sep 17 00:00:00 2001 From: matej nedic Date: Thu, 2 Feb 2023 19:49:15 +0100 Subject: [PATCH 1/3] prepare before testing --- .../dapr/actors/runtime/DaprGrpcClient.java | 99 +++++++------ .../actors/runtime/DaprGrpcClientTest.java | 136 +++++++++--------- 2 files changed, 119 insertions(+), 116 deletions(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java index 6a95fb3f00..8d026d568b 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java @@ -14,21 +14,24 @@ package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; import io.dapr.utils.DurationUtils; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; /** * A DaprClient over HTTP for Actor's runtime. @@ -48,9 +51,9 @@ class DaprGrpcClient implements DaprClient { /** * The GRPC client to be used. * - * @see io.dapr.v1.DaprGrpc.DaprFutureStub + * @see io.dapr.v1.DaprGrpc.DaprStub */ - private DaprGrpc.DaprFutureStub client; + private DaprGrpc.DaprStub client; /** * Internal constructor. @@ -58,16 +61,16 @@ class DaprGrpcClient implements DaprClient { * @param channel channel (client needs to close channel after use). */ DaprGrpcClient(ManagedChannel channel) { - this(DaprGrpc.newFutureStub(channel)); + this(DaprGrpc.newStub(channel)); } /** * Internal constructor. * - * @param grpcClient Dapr's GRPC client. + * @param daprStubClient Dapr's GRPC client. */ - DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) { - this.client = grpcClient; + DaprGrpcClient(DaprGrpc.DaprStub daprStubClient) { + this.client = daprStubClient; } /** @@ -75,17 +78,14 @@ class DaprGrpcClient implements DaprClient { */ @Override public Mono getState(String actorType, String actorId, String keyName) { - return Mono.fromCallable(() -> { DaprProtos.GetActorStateRequest req = - DaprProtos.GetActorStateRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setKey(keyName) - .build(); + DaprProtos.GetActorStateRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setKey(keyName) + .build(); - ListenableFuture futureResponse = client.getActorState(req); - return futureResponse.get(); - }).map(r -> r.getData().toByteArray()); + return Mono.create(it -> client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray()); } /** @@ -132,10 +132,7 @@ public Mono saveStateTransactionally( .addAllOperations(grpcOps) .build(); - return Mono.fromCallable(() -> { - ListenableFuture futureResponse = client.executeActorStateTransaction(req); - return futureResponse.get(); - }).then(); + return Mono.create(it -> client.executeActorStateTransaction(req, createStreamObserver(it))).then(); } /** @@ -147,21 +144,16 @@ public Mono registerReminder( String actorId, String reminderName, ActorReminderParams reminderParams) { - return Mono.fromCallable(() -> { - DaprProtos.RegisterActorReminderRequest req = - DaprProtos.RegisterActorReminderRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(reminderName) - .setData(ByteString.copyFrom(reminderParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) - .build(); - - ListenableFuture futureResponse = client.registerActorReminder(req); - futureResponse.get(); - return null; - }); + DaprProtos.RegisterActorReminderRequest req = + DaprProtos.RegisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .setData(ByteString.copyFrom(reminderParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) + .build(); + return Mono.create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -169,7 +161,6 @@ public Mono registerReminder( */ @Override public Mono unregisterReminder(String actorType, String actorId, String reminderName) { - return Mono.fromCallable(() -> { DaprProtos.UnregisterActorReminderRequest req = DaprProtos.UnregisterActorReminderRequest.newBuilder() .setActorType(actorType) @@ -177,10 +168,7 @@ public Mono unregisterReminder(String actorType, String actorId, String re .setName(reminderName) .build(); - ListenableFuture futureResponse = client.unregisterActorReminder(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -192,7 +180,6 @@ public Mono registerTimer( String actorId, String timerName, ActorTimerParams timerParams) { - return Mono.fromCallable(() -> { DaprProtos.RegisterActorTimerRequest req = DaprProtos.RegisterActorTimerRequest.newBuilder() .setActorType(actorType) @@ -204,10 +191,7 @@ public Mono registerTimer( .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) .build(); - ListenableFuture futureResponse = client.registerActorTimer(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then(); } /** @@ -215,7 +199,6 @@ public Mono registerTimer( */ @Override public Mono unregisterTimer(String actorType, String actorId, String timerName) { - return Mono.fromCallable(() -> { DaprProtos.UnregisterActorTimerRequest req = DaprProtos.UnregisterActorTimerRequest.newBuilder() .setActorType(actorType) @@ -223,10 +206,26 @@ public Mono unregisterTimer(String actorType, String actorId, String timer .setName(timerName) .build(); - ListenableFuture futureResponse = client.unregisterActorTimer(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then(); + } + + private StreamObserver createStreamObserver(MonoSink sink) { + return new StreamObserver() { + @Override + public void onNext(T value) { + sink.success(value); + } + + @Override + public void onError(Throwable t) { + sink.error(DaprException.propagate(new ExecutionException(t))); + } + + @Override + public void onCompleted() { + sink.success(); + } + }; } } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java index 6b362b9841..8d1dc3ade7 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -18,10 +18,17 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.dapr.utils.DurationUtils; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentMatcher; import reactor.core.publisher.Mono; @@ -31,9 +38,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; +import static io.dapr.actors.TestUtils.assertThrowsDaprException; import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Mockito.*; public class DaprGrpcClientTest { @@ -44,14 +54,63 @@ public class DaprGrpcClientTest { private static final String ACTOR_ID = "1234567890"; - private DaprGrpc.DaprFutureStub grpcStub; + private static final String ACTOR_ID_OK = "123-Ok"; + + private static final String ACTOR_ID_NULL_INPUT = "123-Null"; + + private static final String ACTOR_EXCEPTION = "1_exception"; + + private static final String METHOD_NAME = "myMethod"; + + private static final byte[] REQUEST_PAYLOAD = "{ \"id\": 123 }".getBytes(); + + private static final byte[] RESPONSE_PAYLOAD = "\"OK\"".getBytes(); + + private final DaprGrpc.DaprImplBase serviceImpl = + mock(DaprGrpc.DaprImplBase.class, delegatesTo( + new DaprGrpc.DaprImplBase() { + @Override + public void getActorState(DaprProtos.GetActorStateRequest request, + StreamObserver responseObserver) { + switch (request.getActorId()) { + case ACTOR_ID_OK: + case ACTOR_ID_NULL_INPUT: + responseObserver.onNext( + DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD)) + .build()); + responseObserver.onCompleted(); + return; + + case ACTOR_EXCEPTION: + Throwable e = new ArithmeticException(); + StatusException se = new StatusException(Status.UNKNOWN.withCause(e)); + responseObserver.onError(se); + return; + } + super.getActorState(request, responseObserver); + } + })); private DaprGrpcClient client; + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + @Before - public void setup() { - grpcStub = mock(DaprGrpc.DaprFutureStub.class); - client = new DaprGrpcClient(grpcStub); + public void setup() throws IOException { + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(serviceImpl).build().start()); + + // Create a client channel and register for automatic graceful shutdown. + ManagedChannel channel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + // Create a HelloWorldClient using the in-process channel; + client = new DaprGrpcClient(DaprGrpc.newStub(channel)); } @Test @@ -59,14 +118,12 @@ public void getActorStateException() { SettableFuture settableFuture = SettableFuture.create(); settableFuture.setException(new ArithmeticException()); - when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - "MyKey" - )))).thenReturn(settableFuture); - Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey"); - Exception exception = assertThrows(Exception.class, () -> result.block()); - assertTrue(exception.getCause().getCause() instanceof ArithmeticException); + Mono result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, "MyKey"); + assertThrowsDaprException( + ExecutionException.class, + "UNKNOWN", + "UNKNOWN: ", + () -> result.block()); } @Test @@ -74,12 +131,6 @@ public void getActorState() { byte[] data = "hello world".getBytes(); SettableFuture settableFuture = SettableFuture.create(); settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build()); - - when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - "MyKey" - )))).thenReturn(settableFuture); Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey"); assertArrayEquals(data, result.block()); } @@ -88,12 +139,6 @@ public void getActorState() { public void saveActorStateTransactionallyException() { SettableFuture settableFuture = SettableFuture.create(); settableFuture.setException(new ArithmeticException()); - - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - new ArrayList<>() - )))).thenReturn(settableFuture); Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>()); Exception exception = assertThrows(Exception.class, () -> result.block()); assertTrue(exception.getCause().getCause() instanceof ArithmeticException); @@ -108,12 +153,6 @@ public void saveActorStateTransactionally() { new ActorStateOperation("upsert", "mykey", "hello world"), new ActorStateOperation("delete", "mykey", null), }; - - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - Arrays.asList(operations) - )))).thenReturn(settableFuture); Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); result.block(); } @@ -128,11 +167,6 @@ public void saveActorStateTransactionallyByteArray() { new ActorStateOperation("delete", "mykey", null), }; - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - Arrays.asList(operations) - )))).thenReturn(settableFuture); Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); result.block(); } @@ -161,14 +195,6 @@ public void registerActorReminder() { Duration.ofSeconds(2) ); - when(grpcStub.registerActorReminder(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(reminderName, argument.getName()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); - return true; - }))).thenReturn(settableFuture); Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params); result.block(); } @@ -180,12 +206,6 @@ public void unregisterActorReminder() { String reminderName = "myreminder"; - when(grpcStub.unregisterActorReminder(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(reminderName, argument.getName()); - return true; - }))).thenReturn(settableFuture); Mono result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, reminderName); result.block(); } @@ -204,15 +224,6 @@ public void registerActorTimer() { Duration.ofSeconds(2) ); - when(grpcStub.registerActorTimer(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(timerName, argument.getName()); - assertEquals(callback, argument.getCallback()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); - return true; - }))).thenReturn(settableFuture); Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, timerName, params); result.block(); } @@ -223,13 +234,6 @@ public void unregisterActorTimer() { settableFuture.set(Empty.newBuilder().build()); String timerName = "mytimer"; - - when(grpcStub.unregisterActorTimer(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(timerName, argument.getName()); - return true; - }))).thenReturn(settableFuture); Mono result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, timerName); result.block(); } From 69c5a043b5e4fa491d8731e646d6f31f32dd7a67 Mon Sep 17 00:00:00 2001 From: matej nedic Date: Sun, 5 Feb 2023 08:52:53 +0100 Subject: [PATCH 2/3] Update tests --- .../actors/runtime/DaprGrpcClientTest.java | 558 +++++++++--------- 1 file changed, 293 insertions(+), 265 deletions(-) diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java index 8d1dc3ade7..5d67c610a6 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -14,10 +14,10 @@ package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.GeneratedMessageV3; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.ManagedChannel; @@ -30,12 +30,10 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.mockito.ArgumentMatcher; import reactor.core.publisher.Mono; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; @@ -43,303 +41,333 @@ import static io.dapr.actors.TestUtils.assertThrowsDaprException; import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.AdditionalAnswers.delegatesTo; -import static org.mockito.Mockito.*; public class DaprGrpcClientTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ACTOR_TYPE = "MyActorType"; + private static final String ACTOR_TYPE = "MyActorType"; - private static final String ACTOR_ID = "1234567890"; + private static final String ACTOR_ID = "1234567890"; - private static final String ACTOR_ID_OK = "123-Ok"; + private static final String KEY = "MyKey"; - private static final String ACTOR_ID_NULL_INPUT = "123-Null"; + private static final String ACTOR_EXCEPTION = "1_exception"; - private static final String ACTOR_EXCEPTION = "1_exception"; + private static final String REMINDER_NAME = "myreminder"; - private static final String METHOD_NAME = "myMethod"; + private static final String TIMER_NAME = "timerName"; - private static final byte[] REQUEST_PAYLOAD = "{ \"id\": 123 }".getBytes(); + private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes(); - private static final byte[] RESPONSE_PAYLOAD = "\"OK\"".getBytes(); + private static final List OPERATIONS = Arrays.asList( + new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), + new ActorStateOperation("delete", "mykey", null)); - private final DaprGrpc.DaprImplBase serviceImpl = - mock(DaprGrpc.DaprImplBase.class, delegatesTo( - new DaprGrpc.DaprImplBase() { - @Override - public void getActorState(DaprProtos.GetActorStateRequest request, - StreamObserver responseObserver) { - switch (request.getActorId()) { - case ACTOR_ID_OK: - case ACTOR_ID_NULL_INPUT: - responseObserver.onNext( - DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD)) - .build()); - responseObserver.onCompleted(); - return; + private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient(); - case ACTOR_EXCEPTION: - Throwable e = new ArithmeticException(); - StatusException se = new StatusException(Status.UNKNOWN.withCause(e)); - responseObserver.onError(se); - return; - } - super.getActorState(request, responseObserver); - } - })); - - private DaprGrpcClient client; - - @Rule - public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - - @Before - public void setup() throws IOException { - // Generate a unique in-process server name. - String serverName = InProcessServerBuilder.generateName(); - - // Create a server, add service, start, and register for automatic graceful shutdown. - grpcCleanup.register(InProcessServerBuilder - .forName(serverName).directExecutor().addService(serviceImpl).build().start()); - - // Create a client channel and register for automatic graceful shutdown. - ManagedChannel channel = grpcCleanup.register( - InProcessChannelBuilder.forName(serverName).directExecutor().build()); - - // Create a HelloWorldClient using the in-process channel; - client = new DaprGrpcClient(DaprGrpc.newStub(channel)); - } - - @Test - public void getActorStateException() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new ArithmeticException()); - - Mono result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, "MyKey"); - assertThrowsDaprException( - ExecutionException.class, - "UNKNOWN", - "UNKNOWN: ", - () -> result.block()); - } - - @Test - public void getActorState() { - byte[] data = "hello world".getBytes(); - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build()); - Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey"); - assertArrayEquals(data, result.block()); - } - - @Test - public void saveActorStateTransactionallyException() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new ArithmeticException()); - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>()); - Exception exception = assertThrows(Exception.class, () -> result.block()); - assertTrue(exception.getCause().getCause() instanceof ArithmeticException); - } - - @Test - public void saveActorStateTransactionally() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", "hello world"), - new ActorStateOperation("delete", "mykey", null), - }; - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - result.block(); - } - - @Test - public void saveActorStateTransactionallyByteArray() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), - new ActorStateOperation("delete", "mykey", null), - }; - - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - result.block(); - } - - @Test - public void saveActorStateTransactionallyInvalidValueType() { - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", 123), - new ActorStateOperation("delete", "mykey", null), - }; - - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - assertThrows(IllegalArgumentException.class, () -> result.block()); - } - - - @Test - public void registerActorReminder() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String reminderName = "myreminder"; - ActorReminderParams params = new ActorReminderParams( - "hello world".getBytes(), - Duration.ofSeconds(1), - Duration.ofSeconds(2) - ); - - Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params); - result.block(); - } - - @Test - public void unregisterActorReminder() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String reminderName = "myreminder"; - - Mono result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, reminderName); - result.block(); - } - - @Test - public void registerActorTimer() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String timerName = "mytimer"; - String callback = "mymethod"; - ActorTimerParams params = new ActorTimerParams( - callback, - "hello world".getBytes(), - Duration.ofSeconds(1), - Duration.ofSeconds(2) - ); - - Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, timerName, params); - result.block(); - } - - @Test - public void unregisterActorTimer() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String timerName = "mytimer"; - Mono result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, timerName); - result.block(); - } - - private static Any getAny(Object value) throws IOException { - if (value instanceof byte[]) { - String base64 = OBJECT_MAPPER.writeValueAsString(value); - return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build(); - } else if (value instanceof String) { - return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build(); - } + private DaprGrpcClient client; + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Before + public void setup() throws IOException { + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(serviceImpl).build().start()); - throw new IllegalArgumentException("Must be byte[] or String"); - } + // Create a client channel and register for automatic graceful shutdown. + ManagedChannel channel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); - private static class GetActorStateRequestMatcher implements ArgumentMatcher { + // Create a HelloWorldClient using the in-process channel; + client = new DaprGrpcClient(DaprGrpc.newStub(channel)); + } - private final String actorType; + @Test + public void getActorStateException() { + Mono result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY); + assertThrowsDaprException( + ExecutionException.class, + "UNKNOWN", + "UNKNOWN: ", + result::block); + } - private final String actorId; + @Test + public void getActorState() { + Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY); + assertArrayEquals(RESPONSE_PAYLOAD, result.block()); + } - private final String key; + @Test + public void saveActorStateTransactionallyException() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_EXCEPTION, OPERATIONS); + assertThrowsDaprException( + ExecutionException.class, + "UNKNOWN", + "UNKNOWN: ", + result::block); + } + @Test + public void saveActorStateTransactionally() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS); + result.block(); + } - GetActorStateRequestMatcher(String actorType, String actorId, String key) { - this.actorType = actorType; - this.actorId = actorId; - this.key = key; + @Test + public void saveActorStateTransactionallyByteArray() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS); + result.block(); } - @Override - public boolean matches(DaprProtos.GetActorStateRequest argument) { - if (argument == null) { - return false; - } + @Test + public void saveActorStateTransactionallyInvalidValueType() { + ActorStateOperation[] operations = new ActorStateOperation[]{ + new ActorStateOperation("upsert", "mykey", 123), + new ActorStateOperation("delete", "mykey", null), + }; - return actorType.equals(argument.getActorType()) - && actorId.equals(argument.getActorId()) - && key.equals(argument.getKey()); + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); + assertThrows(IllegalArgumentException.class, result::block); } - } - private static class ExecuteActorStateTransactionRequestMatcher - implements ArgumentMatcher { - private final String actorType; + @Test + public void registerActorReminder() { + ActorReminderParams params = new ActorReminderParams( + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME, params); + result.block(); + } - private final String actorId; + @Test + public void unregisterActorReminder() { - private final List operations; + Mono result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME); + result.block(); + } - ExecuteActorStateTransactionRequestMatcher(String actorType, String actorId, List operations) { - this.actorType = actorType; - this.actorId = actorId; - this.operations = operations; + @Test + public void registerActorTimer() { + String callback = "mymethod"; + ActorTimerParams params = new ActorTimerParams( + callback, + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + + Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME, params); + result.block(); } - @Override - public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) { - if (argument == null) { - return false; - } - - if (operations.size() != argument.getOperationsCount()) { - return false; - } - - if (!actorType.equals(argument.getActorType()) - || !actorId.equals(argument.getActorId())) { - return false; - } - - for(ActorStateOperation operation : operations) { - boolean found = false; - for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { - if (operation.getKey().equals(grpcOperation.getKey()) - && operation.getOperationType().equals(grpcOperation.getOperationType()) - && nullableEquals(operation.getValue(), grpcOperation.getValue())) { - found = true; - break; - } + @Test + public void unregisterActorTimer() { + Mono result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME); + result.block(); + } + + + private class CustomDaprClient extends DaprGrpc.DaprImplBase { + + @Override + public void getActorState(DaprProtos.GetActorStateRequest request, + StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(KEY, request.getKey()); + assertEquals(ACTOR_ID, request.getActorId()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD)) + .build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.getActorState(request, responseObserver); } - if (!found) { - return false; + public void executeActorStateTransaction(io.dapr.v1.DaprProtos.ExecuteActorStateTransactionRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertTrue(new OperationsMatcher(OPERATIONS).matches(request)); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.executeActorStateTransaction(request, responseObserver); } - } - return true; + @Override + public void registerActorReminder(io.dapr.v1.DaprProtos.RegisterActorReminderRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(REMINDER_NAME, request.getName()); + assertEquals("0h0m1s0ms", request.getDueTime()); + assertEquals("0h0m2s0ms", request.getPeriod()); + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.registerActorReminder(request, responseObserver); + } + + public void registerActorTimer(io.dapr.v1.DaprProtos.RegisterActorTimerRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(TIMER_NAME, request.getName()); + assertEquals("mymethod", request.getCallback()); + assertEquals("0h0m1s0ms", request.getDueTime()); + assertEquals("0h0m2s0ms", request.getPeriod()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.registerActorTimer(request, responseObserver); + } + + /** + *
+         * Unregister an actor timer.
+         * 
+ */ + public void unregisterActorTimer(io.dapr.v1.DaprProtos.UnregisterActorTimerRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(TIMER_NAME, request.getName()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.unregisterActorTimer(request, responseObserver); + } + + public void unregisterActorReminder(io.dapr.v1.DaprProtos.UnregisterActorReminderRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(REMINDER_NAME, request.getName()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.unregisterActorReminder(request, responseObserver); + } + + private void throwException(StreamObserver responseObserver) { + Throwable e = new ArithmeticException(); + StatusException se = new StatusException(Status.UNKNOWN.withCause(e)); + responseObserver.onError(se); + } + + private void populateObserver(StreamObserver responseObserver, GeneratedMessageV3 generatedMessageV3) { + responseObserver.onNext((T) generatedMessageV3); + responseObserver.onCompleted(); + } } - private static boolean nullableEquals(Object one, Any another) { - if (one == null) { - return another.getValue().isEmpty(); - } - - if ((one == null) ^ (another == null)) { - return false; - } - - try { - Any oneAny = getAny(one); - return oneAny.getValue().equals(another.getValue()); - } catch (IOException e) { - e.printStackTrace(); - return false; - } + private static class OperationsMatcher { + + private final List operations; + + OperationsMatcher(List operations) { + this.operations = operations; + } + + public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) { + if (argument == null) { + return false; + } + + if (operations.size() != argument.getOperationsCount()) { + return false; + } + + for (ActorStateOperation operation : operations) { + boolean found = false; + for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { + if (operation.getKey().equals(grpcOperation.getKey()) + && operation.getOperationType().equals(grpcOperation.getOperationType()) + && nullableEquals(operation.getValue(), grpcOperation.getValue())) { + found = true; + break; + } + } + + if (!found) { + return false; + } + } + + return true; + } + + private static boolean nullableEquals(Object one, Any another) { + if (one == null) { + return another.getValue().isEmpty(); + } + + if ((one == null) ^ (another == null)) { + return false; + } + + try { + Any oneAny = getAny(one); + return oneAny.getValue().equals(another.getValue()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + private static Any getAny(Object value) throws IOException { + if (value instanceof byte[]) { + String base64 = OBJECT_MAPPER.writeValueAsString(value); + return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build(); + } else if (value instanceof String) { + return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build(); + } + + throw new IllegalArgumentException("Must be byte[] or String"); + } } - } } From 6a7bef248b0532768abbeaad833ac9e01e66477e Mon Sep 17 00:00:00 2001 From: matej nedic Date: Thu, 16 Mar 2023 20:36:53 +0100 Subject: [PATCH 3/3] fix checkstyle --- .../dapr/actors/runtime/DaprGrpcClient.java | 75 ++++++++++--------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java index 8d026d568b..3fb190b286 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java @@ -78,14 +78,15 @@ class DaprGrpcClient implements DaprClient { */ @Override public Mono getState(String actorType, String actorId, String keyName) { - DaprProtos.GetActorStateRequest req = - DaprProtos.GetActorStateRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setKey(keyName) - .build(); - - return Mono.create(it -> client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray()); + DaprProtos.GetActorStateRequest req = + DaprProtos.GetActorStateRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setKey(keyName) + .build(); + + return Mono.create(it -> + client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray()); } /** @@ -153,7 +154,7 @@ public Mono registerReminder( .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) .build(); - return Mono.create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then(); + return Mono.create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -161,14 +162,14 @@ public Mono registerReminder( */ @Override public Mono unregisterReminder(String actorType, String actorId, String reminderName) { - DaprProtos.UnregisterActorReminderRequest req = - DaprProtos.UnregisterActorReminderRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(reminderName) - .build(); - - return Mono.create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then(); + DaprProtos.UnregisterActorReminderRequest req = + DaprProtos.UnregisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .build(); + + return Mono.create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -180,18 +181,18 @@ public Mono registerTimer( String actorId, String timerName, ActorTimerParams timerParams) { - DaprProtos.RegisterActorTimerRequest req = - DaprProtos.RegisterActorTimerRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(timerName) - .setCallback(timerParams.getCallback()) - .setData(ByteString.copyFrom(timerParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) - .build(); - - return Mono.create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then(); + DaprProtos.RegisterActorTimerRequest req = + DaprProtos.RegisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .setCallback(timerParams.getCallback()) + .setData(ByteString.copyFrom(timerParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) + .build(); + + return Mono.create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then(); } /** @@ -199,14 +200,14 @@ public Mono registerTimer( */ @Override public Mono unregisterTimer(String actorType, String actorId, String timerName) { - DaprProtos.UnregisterActorTimerRequest req = - DaprProtos.UnregisterActorTimerRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(timerName) - .build(); - - return Mono.create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then(); + DaprProtos.UnregisterActorTimerRequest req = + DaprProtos.UnregisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .build(); + + return Mono.create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then(); } private StreamObserver createStreamObserver(MonoSink sink) {