diff --git a/.circleci/config.yml b/.circleci/config.yml index 3a6f6dcb3..e479b18f8 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,6 +1,21 @@ version: 2.1 # CircleCi Build Config for CloudState commands: + install-java-11: + description: install openjdk-11 + steps: + - run: + name: Install java 11 + command: | + wget https://github.com/AdoptOpenJDK/openjdk11-binaries/releases/download/jdk-11.0.8%2B10/OpenJDK11U-jdk_x64_linux_hotspot_11.0.8_10.tar.gz -O /tmp/openjdk-11.tar.gz + + sudo mkdir -p /usr/lib/jvm + sudo tar xfvz /tmp/openjdk-11.tar.gz --directory /usr/lib/jvm + rm -f /tmp/openjdk-11.tar.gz + + sudo sh -c 'for bin in /usr/lib/jvm/jdk-11.0.8+10/bin/*; do update-alternatives --install /usr/bin/$(basename $bin) $(basename $bin) $bin 100; done' + sudo sh -c 'for bin in /usr/lib/jvm/jdk-11.0.8+10/bin/*; do update-alternatives --set $(basename $bin) $bin; done' + setup_sbt: description: "Set up sbt" parameters: @@ -63,6 +78,7 @@ jobs: description: "Build core native images" steps: - checkout + - install-java-11 - setup_sbt - restore_sbt_cache - run: @@ -83,6 +99,7 @@ jobs: description: "Build postgres native images" steps: - checkout + - install-java-11 - setup_sbt - restore_sbt_cache - run: @@ -103,6 +120,7 @@ jobs: description: "Build cassandra native images" steps: - checkout + - install-java-11 - setup_sbt - restore_sbt_cache - run: @@ -119,7 +137,7 @@ jobs: build-sbt-images: docker: - - image: circleci/openjdk:8 + - image: cimg/openjdk:11.0 description: "Build sbt non-native images" steps: - checkout @@ -179,6 +197,7 @@ jobs: description: "native image tck tests" steps: - checkout + - install-java-11 - setup_sbt - restore_sbt_cache - attach_workspace: diff --git a/build.sbt b/build.sbt index b7daaa7d5..ae68a2a1a 100644 --- a/build.sbt +++ b/build.sbt @@ -601,7 +601,7 @@ lazy val `java-support` = (project in file("java-support")) "com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion ), javacOptions in Compile ++= Seq("-encoding", "UTF-8"), - javacOptions in (Compile, compile) ++= Seq("-source", "1.8", "-target", "1.8"), + javacOptions in (Compile, compile) ++= Seq("-source", "11", "-target", "11"), akkaGrpcGeneratedSources in Compile := Seq(AkkaGrpc.Server), akkaGrpcGeneratedLanguages in Compile := Seq(AkkaGrpc.Scala), // FIXME should be Java, but here be dragons PB.protoSources in Compile ++= { @@ -630,7 +630,7 @@ lazy val `java-support-docs` = (project in file("java-support/docs")) Test / PB.protoSources += (baseDirectory in ThisBuild).value / "protocols" / "frontend", Test / PB.protoSources += sourceDirectory.value / "modules" / "java" / "examples" / "proto", Test / PB.targets := Seq(PB.gens.java -> (Test / sourceManaged).value), - Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8") + Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "11", "-target", "11") ) lazy val `java-support-tck` = (project in file("java-support/tck")) @@ -643,7 +643,7 @@ lazy val `java-support-tck` = (project in file("java-support/tck")) akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java), PB.protoSources in Compile += (baseDirectory in ThisBuild).value / "protocols" / "tck", PB.targets in Compile := Seq(PB.gens.java -> (sourceManaged in Compile).value), - javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"), + javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "11", "-target", "11"), assemblySettings("cloudstate-java-tck.jar") ) @@ -663,7 +663,7 @@ lazy val `java-shopping-cart` = (project in file("samples/java-shopping-cart")) PB.targets in Compile := Seq( PB.gens.java -> (sourceManaged in Compile).value ), - javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"), + javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "11", "-target", "11"), assemblySettings("java-shopping-cart.jar") ) @@ -683,7 +683,7 @@ lazy val `java-pingpong` = (project in file("samples/java-pingpong")) PB.targets in Compile := Seq( PB.gens.java -> (sourceManaged in Compile).value ), - javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"), + javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "11", "-target", "11"), assemblySettings("java-pingpong.jar") ) diff --git a/java-support/docs/src/modules/java/pages/getting-started.adoc b/java-support/docs/src/modules/java/pages/getting-started.adoc index ce977fcac..624efc3c2 100644 --- a/java-support/docs/src/modules/java/pages/getting-started.adoc +++ b/java-support/docs/src/modules/java/pages/getting-started.adoc @@ -211,4 +211,12 @@ Exactly which context parameters are available depend on the type of entity and | link:{attachmentsdir}/api/io/cloudstate/javasupport/EntityId.html[`@EntityId`] | The ID of the entity. +| link:{attachmentsdir}/api/io/cloudstate/javasupport/Metadata.html[`Metadata`] +| +| The metadata associated with the command. + +| link:{attachmentsdir}/api/io/cloudstate/javasupport/CloudEvent.html[`CloudEvent`] +| +| The CloudEvent metadata associated with the command. May be wrapped in `java.util.Optional`. + |==== diff --git a/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java b/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java new file mode 100644 index 000000000..5e0c5b836 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java @@ -0,0 +1,187 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.Optional; + +/** CloudEvent representation of Metadata. */ +public interface CloudEvent { + + /** + * The CloudEvent spec version. + * + * @return The spec version. + */ + String specversion(); + + /** + * The id of this CloudEvent. + * + * @return The id. + */ + String id(); + + /** + * Return a new CloudEvent with the given id. + * + * @param id The id to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withId(String id); + + /** + * The source of this CloudEvent. + * + * @return The source. + */ + URI source(); + + /** + * Return a new CloudEvent with the given source. + * + * @param source The source to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withSource(URI source); + + /** + * The type of this CloudEvent. + * + * @return The type. + */ + String type(); + + /** + * Return a new CloudEvent with the given type. + * + * @param type The type to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withType(String type); + + /** + * The data content type of this CloudEvent. + * + * @return The data content type, if set. + */ + Optional datacontenttype(); + + /** + * Return a new CloudEvent with the given data content type. + * + * @param datacontenttype The data content type to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withDatacontenttype(String datacontenttype); + + /** + * Clear the data content type of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearDatacontenttype(); + + /** + * The data schema of this CloudEvent. + * + * @return The data schema, if set. + */ + Optional dataschema(); + + /** + * Return a new CloudEvent with the given data schema. + * + * @param dataschema The data schema to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withDataschema(URI dataschema); + + /** + * Clear the data schema of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearDataschema(); + + /** + * The subject of this CloudEvent. + * + * @return The subject, if set. + */ + Optional subject(); + + /** + * Return a new CloudEvent with the given subject. + * + * @param subject The subject to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withSubject(String subject); + + /** + * Clear the subject of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearSubject(); + + /** + * The time of this CloudEvent. + * + * @return The time, if set. + */ + Optional time(); + + /** + * Return a new CloudEvent with the given time. + * + * @param time The time to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withTime(ZonedDateTime time); + + /** + * Clear the time of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearTime(); + + /** + * Return this CloudEvent represented as Metadata. + * + *

If this CloudEvent was created by {{@link Metadata#asCloudEvent()}}, then any non CloudEvent + * metadata that was present will still be present. + * + * @return This CloudEvent expressed as Cloudstate metadata. + */ + Metadata asMetadata(); + + /** + * Create a CloudEvent. + * + * @param id The id of the CloudEvent. + * @param source The source of the CloudEvent. + * @param type The type of the CloudEvent. + * @return The newly created CloudEvent. + */ + static CloudEvent of(String id, URI source, String type) { + return Metadata.EMPTY.asCloudEvent(id, source, type); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java b/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java index 98de79740..edb6a5501 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java @@ -16,13 +16,19 @@ package io.cloudstate.javasupport; +import akka.actor.ActorSystem; +import akka.stream.Materializer; import com.typesafe.config.Config; import com.google.protobuf.Descriptors; +import io.cloudstate.javasupport.action.Action; +import io.cloudstate.javasupport.action.ActionHandler; import io.cloudstate.javasupport.crdt.CrdtEntity; import io.cloudstate.javasupport.crdt.CrdtEntityFactory; import io.cloudstate.javasupport.eventsourced.EventSourcedEntity; import io.cloudstate.javasupport.eventsourced.EventSourcedEntityFactory; import io.cloudstate.javasupport.impl.AnySupport; +import io.cloudstate.javasupport.impl.action.AnnotationBasedActionSupport; +import io.cloudstate.javasupport.impl.action.ActionService; import io.cloudstate.javasupport.impl.crdt.AnnotationBasedCrdtSupport; import io.cloudstate.javasupport.impl.crdt.CrdtStatefulService; import io.cloudstate.javasupport.impl.eventsourced.AnnotationBasedEventSourcedSupport; @@ -32,13 +38,14 @@ import java.util.concurrent.CompletionStage; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; /** * The CloudState class is the main interface to configuring entities to deploy, and subsequently * starting a local server which will expose these entities to the CloudState Proxy Sidecar. */ public final class CloudState { - private final Map services = new HashMap<>(); + private final Map> services = new HashMap<>(); private ClassLoader classLoader = getClass().getClassLoader(); private String typeUrlPrefix = AnySupport.DefaultTypeUrlPrefix(); private AnySupport.Prefer prefer = AnySupport.PREFER_JAVA(); @@ -124,14 +131,15 @@ public CloudState registerEventSourcedEntity( final AnySupport anySupport = newAnySupport(additionalDescriptors); - services.put( - descriptor.getFullName(), + EventSourcedStatefulService service = new EventSourcedStatefulService( new AnnotationBasedEventSourcedSupport(entityClass, anySupport, descriptor), descriptor, anySupport, persistenceId, - snapshotEvery)); + snapshotEvery); + + services.put(descriptor.getFullName(), system -> service); return this; } @@ -160,12 +168,13 @@ public CloudState registerEventSourcedEntity( Descriptors.FileDescriptor... additionalDescriptors) { services.put( descriptor.getFullName(), - new EventSourcedStatefulService( - factory, - descriptor, - newAnySupport(additionalDescriptors), - persistenceId, - snapshotEvery)); + system -> + new EventSourcedStatefulService( + factory, + descriptor, + newAnySupport(additionalDescriptors), + persistenceId, + snapshotEvery)); return this; } @@ -194,18 +203,19 @@ public CloudState registerCrdtEntity( final AnySupport anySupport = newAnySupport(additionalDescriptors); - services.put( - descriptor.getFullName(), + CrdtStatefulService service = new CrdtStatefulService( new AnnotationBasedCrdtSupport(entityClass, anySupport, descriptor), descriptor, - anySupport)); + anySupport); + + services.put(descriptor.getFullName(), system -> service); return this; } /** - * Register an CRDt entity factory. + * Register a CRDT entity factory. * *

This is a low level API intended for custom (eg, non reflection based) mechanisms for * implementing the entity. @@ -222,7 +232,76 @@ public CloudState registerCrdtEntity( Descriptors.FileDescriptor... additionalDescriptors) { services.put( descriptor.getFullName(), - new CrdtStatefulService(factory, descriptor, newAnySupport(additionalDescriptors))); + system -> + new CrdtStatefulService(factory, descriptor, newAnySupport(additionalDescriptors))); + + return this; + } + + /** + * Register an annotated Action service. + * + *

The action class must be annotated with {@link Action}. + * + * @param action The action object. + * @param descriptor The descriptor for the service that this action implements. + * @param additionalDescriptors Any additional descriptors that should be used to look up protobuf + * types when needed. + * @return This Cloudstate builder. + */ + public CloudState registerAction( + Object action, + Descriptors.ServiceDescriptor descriptor, + Descriptors.FileDescriptor... additionalDescriptors) { + + Action actionAnnotation = action.getClass().getAnnotation(Action.class); + if (actionAnnotation == null) { + throw new IllegalArgumentException( + action.getClass() + " does not declare an " + Action.class + " annotation!"); + } + + final AnySupport anySupport = newAnySupport(additionalDescriptors); + + services.put( + descriptor.getFullName(), + system -> + new ActionService( + new AnnotationBasedActionSupport( + action, anySupport, descriptor, Materializer.createMaterializer(system)), + descriptor, + anySupport)); + + return this; + } + + /** + * Register an Action handler. + * + *

This is a low level API intended for custom (eg, non reflection based) mechanisms for + * implementing the action. + * + * @param action The action handler. + * @param descriptor The descriptor for the service that this action implements. + * @param additionalDescriptors Any additional descriptors that should be used to look up protobuf + * types when needed. + * @return This Cloudstate builder. + */ + public CloudState registerAction( + ActionHandler action, + Descriptors.ServiceDescriptor descriptor, + Descriptors.FileDescriptor... additionalDescriptors) { + + Action actionAnnotation = action.getClass().getAnnotation(Action.class); + if (actionAnnotation == null) { + throw new IllegalArgumentException( + action.getClass() + " does not declare an " + Action.class + " annotation!"); + } + + final AnySupport anySupport = newAnySupport(additionalDescriptors); + + ActionService service = new ActionService(action, descriptor, anySupport); + + services.put(descriptor.getFullName(), system -> service); return this; } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java b/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java new file mode 100644 index 000000000..46f19d785 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java @@ -0,0 +1,270 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport; + +import io.cloudstate.javasupport.impl.MetadataImpl; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; + +/** + * Transport specific metadata. + * + *

The exact semantics of how metadata is handled depends on the underlying transport. This API + * exposes case insensitive lookups on metadata, but maintains the original case of the keys as + * received or inserted. If case matters, the iterator should be used to access elements. + * + *

Multiple values are also supported per key, if the underlying transport does not support + * multiple values per key, which value will be used is undefined. + * + *

Metadata can either have a string or a binary value. If the underlying transport doesn't + * support one or the other, how those values are handled is undefined - eg, text values may be + * UTF-8 encoded in binary, or binary values may be Base64 encoded, it depends on the transport. + * + *

This API maintains the order of entries, but the underlying transport may not. + * + *

Implementations of this class should be immutable, all update operations should return a copy + * of the metadata. + */ +public interface Metadata extends Iterable { + + /** + * Get the string value for the given key, if found. + * + *

If the entry is a binary entry, nothing will be returned. + * + *

The key lookup is case insensitive. If multiple entries with the same key are present, the + * first string entry will be returned. + * + * @param key The key to lookup. + * @return The value, if found. + */ + Optional get(String key); + + /** + * Get all the string values for a given key. + * + *

Binary values will be ignored. The key lookup is case insensitive. + * + * @param key The key to lookup. + * @return A list of all the string values for the given key. + */ + List getAll(String key); + + /** + * Get the binary value for the given key, if found. + * + *

If the entry is a string entry, nothing will be returned. + * + *

The key lookup is case insensitive. If multiple entries with the same key are present, the + * first binary entry will be returned. + * + * @param key The key to lookup. + * @return The value, if found. + */ + Optional getBinary(String key); + + /** + * Get all the binary values for a given key. + * + *

String values will be ignored. The key lookup is case insensitive. + * + * @param key The key to lookup. + * @return A list of all the binary values for the given key. + */ + List getBinaryAll(String key); + + /** + * Check whether this metadata has a entry for the given key. + * + *

The key lookup will be case insensitive. + * + * @param key The key to lookup. + * @return True if an entry for the given key exists, otherwise false. + */ + boolean has(String key); + + /** + * Get all the keys for all the entries. + * + *

This list may contain duplicate keys if there are multiple entries with the same key. + * + *

The case of the keys will be the case as passed from the proxy or from other APIs. + * + * @return A list of all the keys in this metadata. + */ + List getAllKeys(); + + /** + * Set the string value for the given key. + * + *

This will replace any existing entries that case insensitively match the given key. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry set. + * + * @param key The key to set. + * @param value The value to set. + * @return A copy of this Metadata object with the entry set. + */ + Metadata set(String key, String value); + + /** + * Set the binary value for the given key. + * + *

This will replace any existing entries that case insensitively match the given key. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry set. + * + * @param key The key to set. + * @param value The value to set. + * @return A copy of this Metadata object with the entry set. + */ + Metadata setBinary(String key, ByteBuffer value); + + /** + * Add the string value for the given key. + * + *

This will not replace any existing entries, it will simply append the entry to the end of + * the list. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry added. + * + * @param key The key to add. + * @param value The value to add. + * @return A copy of this Metadata object with the entry added. + */ + Metadata add(String key, String value); + + /** + * Add the binary value for the given key. + * + *

This will not replace any existing entries, it will simply append the entry to the end of + * the list. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry added. + * + * @param key The key to add. + * @param value The value to add. + * @return A copy of this Metadata object with the entry added. + */ + Metadata addBinary(String key, ByteBuffer value); + + /** + * Remove all metadata entries with the given key. + * + *

The key will be matched against entries case insensitively. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entries removed. + * + * @param key The key to remove. + * @return A copy of this Metadata object with the entries removed. + */ + Metadata remove(String key); + + /** + * Clear all metadata entries. + * + *

This method does not modify this Metadata object, it returns an empty Metadata object. + * + * @return An empty metadata object. + */ + Metadata clear(); + + /** + * Whether this metadata is also a CloudEvent. + * + *

This will return true if all of the required CloudEvent fields are set, that is, the + * specversion, id, source and type fields. + * + * @return True if the CloudEvent required attributes are set in this Metadata. + */ + boolean isCloudEvent(); + + /** + * Return a CloudEvent representation of this Metadata. + * + *

Note that the CloudEvent representation will retain any non CloudEvent metadata when + * converted back to Metadata. + * + * @return This Metadata expressed as CloudEvent metadata. + * @throws IllegalStateException If this metadata is not a CloudEvent, that is, if it doesn't have + * any of specversion, id, source or type CloudEvent fields defined. + */ + CloudEvent asCloudEvent(); + + /** + * Convert this metadata to a CloudEvent, adding the given required CloudEvent fields. + * + *

Any metadata in this Metadata object will be left intact when asMetadata is called + * + * @param id The id of the CloudEvent. + * @param source The source of the CloudEvent. + * @param type The type of the CloudEvent. + * @return This metadata, represented as a CloudEvent with the specified fields. + */ + CloudEvent asCloudEvent(String id, URI source, String type); + + /** A metadata entry. */ + interface MetadataEntry { + /** + * The key for the metadata entry. + * + *

The key will be in the original case it was inserted or sent as. + * + * @return The key. + */ + String getKey(); + + /** + * The string value for the metadata entry. + * + * @return The string value, or null if this entry is not a string Metadata entry. + */ + String getValue(); + + /** + * The binary value for the metadata entry. + * + * @return The binary value, or null if this entry is not a string Metadata entry. + */ + ByteBuffer getBinaryValue(); + + /** + * Whether this entry is a text entry. + * + * @return True if this entry is a text entry. + */ + boolean isText(); + + /** + * Whether this entry is a binary entry. + * + * @return True if this entry is a binary entry. + */ + boolean isBinary(); + } + + /** An empty Metadata object. */ + Metadata EMPTY = MetadataImpl.Empty(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java b/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java new file mode 100644 index 000000000..8af8a9e0c --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport; + +/** Context that provides access to metadata. */ +public interface MetadataContext extends Context { + /** Get the metadata associated with this context. */ + Metadata metadata(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java index 7f2cbd2e6..43988b0d8 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java @@ -35,4 +35,11 @@ public interface ServiceCall { * @return The message to pass to the call, serialized as an {@link Any}. */ Any message(); + + /** + * The metadata to pass with the message when the call is invoked. + * + * @return The metadata. + */ + Metadata metadata(); } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java index da270b2bf..94ccdee81 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java @@ -38,5 +38,17 @@ public interface ServiceCallRef { * @param message The message to pass to the method. * @return A service call that can be used as a forward or effect. */ - ServiceCall createCall(T message); + default ServiceCall createCall(T message) { + return createCall(message, Metadata.EMPTY); + } + + /** + * Create a call from this reference, using the given message as the message to pass to it when + * it's invoked. + * + * @param message The message to pass to the method. + * @param metadata The Metadata to send. + * @return A service call that can be used as a forward or effect. + */ + ServiceCall createCall(T message, Metadata metadata); } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/Action.java b/java-support/src/main/java/io/cloudstate/javasupport/action/Action.java new file mode 100644 index 000000000..16f4fd57a --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/Action.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.impl.CloudStateAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** An action. */ +@CloudStateAnnotation +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Action {} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/ActionContext.java b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionContext.java new file mode 100644 index 000000000..8eec40dd4 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionContext.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.MetadataContext; + +/** Context for action calls. */ +public interface ActionContext extends MetadataContext { + /** + * Get the metadata associated with this call. + * + *

Note, this only returns call level associated metadata. For unary in calls, this will be the + * same as the message metadata, but for streamed calls, it will contain metadata associated with + * the whole stream, so for example if this was a gRPC call, it will contain the HTTP headers for + * that gRPC call. + * + * @return The call level metadata. + */ + Metadata metadata(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/ActionHandler.java b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionHandler.java new file mode 100644 index 000000000..75bf9ae04 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import akka.NotUsed; +import akka.stream.javadsl.Source; +import com.google.protobuf.Any; + +import java.util.concurrent.CompletionStage; + +/** Low level interface for handling for action calls. */ +public interface ActionHandler { + + /** + * Handle a unary call. + * + * @param commandName The name of the command this call is for. + * @param message The message envelope of the message. + * @param context The action context. + * @return A future of the message to return. + */ + CompletionStage> handleUnary( + String commandName, MessageEnvelope message, ActionContext context); + + /** + * Handle a streamed out call call. + * + * @param commandName The name of the command this call is for. + * @param message The message envelope of the message. + * @param context The action context. + * @return The stream of messages to return. + */ + Source, NotUsed> handleStreamedOut( + String commandName, MessageEnvelope message, ActionContext context); + + /** + * Handle a streamed in call. + * + * @param commandName The name of the command this call is for. + * @param stream The stream of messages to handle. + * @param context The action context. + * @return A future of the message to return. + */ + CompletionStage> handleStreamedIn( + String commandName, Source, NotUsed> stream, ActionContext context); + + /** + * Handle a full duplex streamed in call. + * + * @param commandName The name of the command this call is for. + * @param stream The stream of messages to handle. + * @param context The action context. + * @return The stream of messages to return. + */ + Source, NotUsed> handleStreamed( + String commandName, Source, NotUsed> stream, ActionContext context); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/ActionReply.java b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionReply.java new file mode 100644 index 000000000..30c06eb76 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/ActionReply.java @@ -0,0 +1,91 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.ServiceCall; +import io.cloudstate.javasupport.impl.action.ForwardReplyImpl; +import io.cloudstate.javasupport.impl.action.MessageReplyImpl; +import io.cloudstate.javasupport.impl.action.NoReply; + +import java.util.Collection; + +/** + * An action reply. + * + *

Action replies allow returning forwards and attaching effects to messages. + * + * @param The type of the message that must be returned by this action call. + */ +public interface ActionReply { + /** + * The effects attached to this reply. + * + * @return The effects. + */ + Collection effects(); + + /** + * Attach the given effects to this reply. + * + * @param effects The effects to attach. + * @return A new reply with the attached effects. + */ + ActionReply withEffects(Effect... effects); + + /** + * Create a message reply. + * + * @param payload The payload of the reply. + * @return A message reply. + */ + static MessageReply message(T payload) { + return message(payload, Metadata.EMPTY); + } + + /** + * Create a message reply. + * + * @param payload The payload of the reply. + * @param metadata The metadata for the message. + * @return A message reply. + */ + static MessageReply message(T payload, Metadata metadata) { + return new MessageReplyImpl<>(payload, metadata); + } + + /** + * Create a forward reply. + * + * @param serviceCall The service call representing the forward. + * @return A forward reply. + */ + static ForwardReply forward(ServiceCall serviceCall) { + return new ForwardReplyImpl<>(serviceCall); + } + + /** + * Create a reply that contains neither a message nor a forward. + * + *

This may be useful for emitting effects without sending a message. + * + * @return The reply. + */ + static ActionReply noReply() { + return NoReply.apply(); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/CallHandler.java b/java-support/src/main/java/io/cloudstate/javasupport/action/CallHandler.java new file mode 100644 index 000000000..13d86339f --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/CallHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.impl.CloudStateAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An action call handler. + * + *

This annotation should be placed on methods that handle Action service calls. + * + *

The types of the input and output parameters for these methods depend on whether the call is a + * unary or streamed call. + * + *

Calls with a unary in argument may accept the protobuf type of the call, either bare, or + * wrapped in {@link MessageEnvelope}. + * + *

Calls with a streamed in argument may accept either a {@link akka.stream.javadsl.Source}, + * {@link org.reactivestreams.Publisher} or a {@link java.util.concurrent.Flow.Publisher}. The + * element type may either be the bare protobuf type of the call, or that type wrapped in {@link + * MessageEnvelope}. + * + *

Calls with a unary out argument may either return synchronously, or return a {@link + * java.util.concurrent.CompletionStage}. The argument return type may either be the raw protobuf + * output type of the call, or wrapped in {@link MessageEnvelope} or {@link ActionReply}. + * + *

Calls with a streamed out argument may either return a {@link akka.stream.javadsl.Source}, + * {@link org.reactivestreams.Publisher} or a {@link java.util.concurrent.Flow.Publisher}. The + * element type of these may either be the raw protobuf output type of the call, or wrapped in + * {@link MessageEnvelope} or {@link ActionReply}. + */ +@CloudStateAnnotation +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface CallHandler { + + /** + * The name of the command to handle. + * + *

If not specified, the name of the method will be used as the command name, with the first + * letter capitalized to match the gRPC convention of capitalizing rpc method names. + * + * @return The command name. + */ + String name() default ""; +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/Effect.java b/java-support/src/main/java/io/cloudstate/javasupport/action/Effect.java new file mode 100644 index 000000000..ee00b9720 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/Effect.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.ServiceCall; +import io.cloudstate.javasupport.impl.action.EffectImpl; + +/** An effect. */ +public interface Effect { + + /** The service call that is executed as this effect. */ + ServiceCall serviceCall(); + + /** Whether this effect should be executed synchronously or not. */ + boolean synchronous(); + + /** + * Create an effect of the given service call. + * + * @param serviceCall The service call to effect. + * @param synchronous Whether this effect should be executed synchronously. + * @return The effect. + */ + static Effect of(ServiceCall serviceCall, boolean synchronous) { + return new EffectImpl(serviceCall, synchronous); + } + + /** + * Create an effect of the given service call. + * + * @param serviceCall The service call to effect. + * @return The effect. + */ + static Effect of(ServiceCall serviceCall) { + return new EffectImpl(serviceCall, false); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/ForwardReply.java b/java-support/src/main/java/io/cloudstate/javasupport/action/ForwardReply.java new file mode 100644 index 000000000..2aea4deca --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/ForwardReply.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.ServiceCall; + +/** A forward reply. */ +public interface ForwardReply extends ActionReply { + + /** + * The service call that is being forwarded to. + * + * @return The service call. + */ + ServiceCall serviceCall(); + + ForwardReply withEffects(Effect... effect); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/MessageEnvelope.java b/java-support/src/main/java/io/cloudstate/javasupport/action/MessageEnvelope.java new file mode 100644 index 000000000..7229f10ac --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/MessageEnvelope.java @@ -0,0 +1,58 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.impl.action.MessageEnvelopeImpl; + +/** A message envelope. */ +public interface MessageEnvelope { + /** + * The metadata associated with the message. + * + * @return The metadata. + */ + Metadata metadata(); + + /** + * The payload of the message. + * + * @return The payload. + */ + T payload(); + + /** + * Create a message. + * + * @param payload The payload of the message. + * @return The message. + */ + static MessageEnvelope of(T payload) { + return new MessageEnvelopeImpl<>(payload, Metadata.EMPTY); + } + + /** + * Create a message. + * + * @param payload The payload of the message. + * @param metadata The metadata associated with the message. + * @return The message. + */ + static MessageEnvelope of(T payload, Metadata metadata) { + return new MessageEnvelopeImpl<>(payload, metadata); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/action/MessageReply.java b/java-support/src/main/java/io/cloudstate/javasupport/action/MessageReply.java new file mode 100644 index 000000000..04a23163d --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/action/MessageReply.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.action; + +import io.cloudstate.javasupport.Metadata; + +/** A message reply. */ +public interface MessageReply extends ActionReply { + + /** + * The payload of the message reply. + * + * @return The payload. + */ + T payload(); + + /** + * The metadata associated with the message. + * + * @return The metadata. + */ + Metadata metadata(); + + MessageReply withEffects(Effect... effect); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java index 5eb1c41f9..d47515ab2 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java @@ -18,8 +18,7 @@ import io.cloudstate.javasupport.ClientActionContext; import io.cloudstate.javasupport.EffectContext; - -import java.util.Optional; +import io.cloudstate.javasupport.MetadataContext; /** * Context for handling a command. @@ -27,7 +26,7 @@ *

This may be passed to any {@link CommandHandler} annotated element. */ public interface CommandContext - extends CrdtContext, CrdtFactory, EffectContext, ClientActionContext { + extends CrdtContext, CrdtFactory, EffectContext, ClientActionContext, MetadataContext { /** * The id of the command. This is an internal ID generated by the proxy, and is unique to a given * entity stream. It may be used for debugging purposes. diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java index 7b5aa3f05..be860a168 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java @@ -17,6 +17,7 @@ package io.cloudstate.javasupport.crdt; import io.cloudstate.javasupport.EffectContext; +import io.cloudstate.javasupport.MetadataContext; import java.util.function.Consumer; @@ -25,7 +26,7 @@ * *

This is sent to callbacks registered by {@link StreamedCommandContext#onCancel(Consumer)}. */ -public interface StreamCancelledContext extends CrdtContext, EffectContext { +public interface StreamCancelledContext extends CrdtContext, EffectContext, MetadataContext { /** * The id of the command that the stream was for. * diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java index f212f2bc9..8b731119c 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java @@ -16,6 +16,8 @@ package io.cloudstate.javasupport.crdt; +import io.cloudstate.javasupport.MetadataContext; + import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -26,7 +28,7 @@ *

This may be passed to any {@link CommandHandler} annotated element that corresponds to a * command whose output is streamed. */ -public interface StreamedCommandContext extends CommandContext { +public interface StreamedCommandContext extends CommandContext, MetadataContext { /** * Whether the call is actually streamed. * diff --git a/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java index 7c1b1b44a..da35ce0eb 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java @@ -18,6 +18,7 @@ import io.cloudstate.javasupport.ClientActionContext; import io.cloudstate.javasupport.EffectContext; +import io.cloudstate.javasupport.MetadataContext; /** * An event sourced command context. @@ -26,7 +27,8 @@ * new events in response to a command, along with forwarding the result to other entities, and * performing side effects on other entities. */ -public interface CommandContext extends EventSourcedContext, ClientActionContext, EffectContext { +public interface CommandContext + extends EventSourcedContext, ClientActionContext, EffectContext, MetadataContext { /** * The current sequence number of events in this entity. * diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala index 7705e2d73..3ceb3fb2d 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala @@ -25,12 +25,14 @@ import akka.http.scaladsl._ import akka.http.scaladsl.model._ import akka.stream.{ActorMaterializer, Materializer} import com.google.protobuf.Descriptors +import io.cloudstate.javasupport.impl.action.{ActionService, StatelessFunctionImpl} import io.cloudstate.javasupport.impl.eventsourced.{EventSourcedImpl, EventSourcedStatefulService} import io.cloudstate.javasupport.impl.{EntityDiscoveryImpl, ResolvedServiceCallFactory, ResolvedServiceMethod} import io.cloudstate.javasupport.impl.crdt.{CrdtImpl, CrdtStatefulService} import io.cloudstate.protocol.crdt.CrdtHandler import io.cloudstate.protocol.entity.EntityDiscoveryHandler import io.cloudstate.protocol.event_sourced.EventSourcedHandler +import io.cloudstate.protocol.function.StatelessFunctionHandler import scala.compat.java8.FutureConverters import scala.concurrent.Future @@ -56,20 +58,27 @@ object CloudStateRunner { /** * The CloudStateRunner is responsible for handle the bootstrap of entities, - * and is used by [[io.cloudstate.javasupport.CloudState.start()]] to set up the local + * and is used by [[io.cloudstate.javasupport.CloudState#start()]] to set up the local * server with the given configuration. * - * CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState.start()]] isn't enough. + * CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState#start()]] isn't enough. */ -final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[String, StatefulService]) { +final class CloudStateRunner private[this] ( + _system: ActorSystem, + serviceFactories: Map[String, java.util.function.Function[ActorSystem, Service]] +) { private[javasupport] implicit final val system = _system private[this] implicit final val materializer: Materializer = ActorMaterializer() private[this] final val configuration = new CloudStateRunner.Configuration(system.settings.config.getConfig("cloudstate")) + private val services = serviceFactories.toSeq.map { + case (serviceName, factory) => serviceName -> factory(system) + }.toMap + // TODO JavaDoc - def this(services: java.util.Map[String, StatefulService]) { + def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]]) { this(ActorSystem("StatefulService", { val conf = ConfigFactory.load() conf.getConfig("cloudstate.system").withFallback(conf) @@ -77,7 +86,7 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ } // TODO JavaDoc - def this(services: java.util.Map[String, StatefulService], config: Config) { + def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]], config: Config) { this(ActorSystem("StatefulService", config), services.asScala.toMap) } @@ -100,6 +109,11 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ val crdtImpl = new CrdtImpl(system, crdtServices, rootContext) route orElse CrdtHandler.partial(crdtImpl) + case (route, (serviceClass, actionServices: Map[String, ActionService] @unchecked)) + if serviceClass == classOf[ActionService] => + val actionImpl = new StatelessFunctionImpl(system, actionServices, rootContext) + route orElse StatelessFunctionHandler.partial(actionImpl) + case (_, (serviceClass, _)) => sys.error(s"Unknown StatefulService: $serviceClass") } @@ -144,7 +158,7 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ * StatefulService describes an entitiy type in a way which makes it possible * to deploy. */ -trait StatefulService { +trait Service { /** * @return a Protobuf ServiceDescriptor of its externally accessible gRPC API diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala index a5b84c739..a9437623f 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala @@ -21,9 +21,9 @@ import io.cloudstate.protocol.entity._ import scala.concurrent.Future import akka.actor.ActorSystem import com.google.protobuf.DescriptorProtos -import io.cloudstate.javasupport.{BuildInfo, StatefulService} +import io.cloudstate.javasupport.{BuildInfo, Service} -class EntityDiscoveryImpl(system: ActorSystem, services: Map[String, StatefulService]) extends EntityDiscovery { +class EntityDiscoveryImpl(system: ActorSystem, services: Map[String, Service]) extends EntityDiscovery { private def configuredOrElse(key: String, default: String): String = if (system.settings.config.hasPath(key)) system.settings.config.getString(key) else default diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala new file mode 100644 index 000000000..d55a50091 --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala @@ -0,0 +1,190 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl + +import java.net.URI +import java.nio.ByteBuffer +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util +import java.util.{Objects, Optional} + +import com.google.protobuf.ByteString +import io.cloudstate.javasupport.{CloudEvent, Metadata} +import io.cloudstate.protocol.entity.MetadataEntry + +import scala.collection.immutable +import scala.compat.java8.OptionConverters._ +import scala.collection.JavaConverters._ + +private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent { + + override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key)) + + override def get(key: String): Optional[String] = + entries.collectFirst { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + }.asJava + + override def getAll(key: String): util.List[String] = + entries.collect { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + }.asJava + + override def getBinary(key: String): Optional[ByteBuffer] = + entries.collectFirst { + case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => + value.asReadOnlyByteBuffer() + }.asJava + + override def getBinaryAll(key: String): util.List[ByteBuffer] = + entries.collect { + case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => + value.asReadOnlyByteBuffer() + }.asJava + + override def getAllKeys: util.List[String] = entries.map(_.key).asJava + + override def set(key: String, value: String): MetadataImpl = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + } + + override def setBinary(key: String, value: ByteBuffer): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + } + + override def add(key: String, value: String): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + } + + override def addBinary(key: String, value: ByteBuffer): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + } + + override def remove(key: String): MetadataImpl = new MetadataImpl(removeKey(key)) + + override def clear(): Metadata = MetadataImpl.Empty + + override def iterator(): util.Iterator[Metadata.MetadataEntry] = + entries.iterator.map { entry => + new Metadata.MetadataEntry { + override def getKey: String = entry.key + override def getValue: String = entry.value.stringValue.orNull + override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull + override def isText: Boolean = entry.value.isStringValue + override def isBinary: Boolean = entry.value.isBytesValue + } + }.asJava + + private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key)) + + def isCloudEvent: Boolean = MetadataImpl.CeRequired.forall(h => has(h)) + + override def asCloudEvent(): CloudEvent = + if (!isCloudEvent) { + throw new IllegalStateException("Metadata is not a CloudEvent!") + } else this + + override def asCloudEvent(id: String, source: URI, `type`: String): CloudEvent = + new MetadataImpl( + entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++ + Seq( + MetadataEntry(MetadataImpl.CeSpecversion, MetadataEntry.Value.StringValue(MetadataImpl.CeSpecversionValue)), + MetadataEntry(MetadataImpl.CeId, MetadataEntry.Value.StringValue(id)), + MetadataEntry(MetadataImpl.CeSource, MetadataEntry.Value.StringValue(source.toString)), + MetadataEntry(MetadataImpl.CeType, MetadataEntry.Value.StringValue(`type`)) + ) + ) + + private def getRequiredCloudEventField(key: String) = + entries + .collectFirst { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + } + .getOrElse { + throw new IllegalStateException(s"Metadata is not a CloudEvent because it does not have required field $key") + } + + override def specversion(): String = getRequiredCloudEventField(MetadataImpl.CeSpecversion) + + override def id(): String = getRequiredCloudEventField(MetadataImpl.CeId) + + override def withId(id: String): CloudEvent = set(MetadataImpl.CeId, id) + + override def source(): URI = URI.create(getRequiredCloudEventField(MetadataImpl.CeSource)) + + override def withSource(source: URI): CloudEvent = set(MetadataImpl.CeSource, source.toString) + + override def `type`(): String = getRequiredCloudEventField(MetadataImpl.CeType) + + override def withType(`type`: String): CloudEvent = set(MetadataImpl.CeType, `type`) + + override def datacontenttype(): Optional[String] = get(MetadataImpl.CeDatacontenttype) + + override def withDatacontenttype(datacontenttype: String): CloudEvent = + set(MetadataImpl.CeDatacontenttype, datacontenttype) + + override def clearDatacontenttype(): CloudEvent = remove(MetadataImpl.CeDatacontenttype) + + override def dataschema(): Optional[URI] = get(MetadataImpl.CeDataschema).map(URI.create(_)) + + override def withDataschema(dataschema: URI): CloudEvent = set(MetadataImpl.CeDataschema, dataschema.toString) + + override def clearDataschema(): CloudEvent = remove(MetadataImpl.CeDataschema) + + override def subject(): Optional[String] = get(MetadataImpl.CeSubject) + + override def withSubject(subject: String): CloudEvent = set(MetadataImpl.CeSubject, subject) + + override def clearSubject(): CloudEvent = remove(MetadataImpl.CeSubject) + + override def time(): Optional[ZonedDateTime] = get(MetadataImpl.CeTime).map(ZonedDateTime.parse(_)) + + override def withTime(time: ZonedDateTime): CloudEvent = + set(MetadataImpl.CeTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)) + + override def clearTime(): CloudEvent = remove(MetadataImpl.CeTime) + + override def asMetadata(): Metadata = this +} + +object MetadataImpl { + val CeSpecversion = "ce-specversion" + val CeSpecversionValue = "1.0" + val CeId = "ce-id" + val CeSource = "ce-source" + val CeType = "ce-type" + // As per CloudEvent HTTP encoding spec, we use Content-Type to encode this. + val CeDatacontenttype = "Content-Type" + val CeDataschema = "ce-dataschema" + val CeSubject = "ce-subject" + val CeTime = "ce-time" + val CeRequired: Set[String] = Set(CeSpecversion, CeId, CeSource, CeType) + + val Empty = new MetadataImpl(Vector.empty) +} diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala index 633a72906..e2e7851f7 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala @@ -20,11 +20,53 @@ import java.lang.annotation.Annotation import java.lang.reflect.{AccessibleObject, Executable, Member, Method, ParameterizedType, Type, WildcardType} import java.util.Optional -import io.cloudstate.javasupport.{Context, EntityContext, EntityId, ServiceCallFactory} +import io.cloudstate.javasupport.{ + CloudEvent, + Context, + EntityContext, + EntityId, + Metadata, + MetadataContext, + ServiceCallFactory +} import com.google.protobuf.{Any => JavaPbAny} import scala.reflect.ClassTag +/** + * How we do reflection: + * + * Where possible, all reflection should be done up front, parameter handlers should be calculated, return type + * mappers should be calculated, and everything stored in maps for fast lookup in request hot paths. + * + * Where this isn't possible, eg because some things may be routed based on type, and supertypes may be supported, + * and the full type hierarchy isn't known up front, then the results of reflection should be cached. + * + * The general approach to reflective invocations is that each type of method (eg, command handler, event handler, + * etc) should have an invoker defined for it. This invoker is responsible for working out how to invoke the method, + * given a set of input parameters, and what to do with its result. + * + * Each invoker should store an array of parameter handlers. A parameter handler takes an input context, and converts + * it to the thing that needs to be passed to the method. When invoking the method, this array of handlers is mapped + * to the array of parameters, to be used in the reflective invocation. Determining the right parameter handler for + * a given parameter type is done by partial functions, the case statements check if the parameter type is of a + * particular type or has a particular annotation, and if it does, returns the handler for that. If nothing matches, + * the fall back is to treat that parameter as the "main argument", this is the command message or event message that + * is being handled by the method. If possible, validation is done on the main argument to ensure it is of the + * expected type. + * + * An invoker may also need to do some processing on the return type. It should, up front, define a mapping function + * up front that converts the type returned by the method to the type that the invoker needs to return. + * + * Invokers themselves are stored in a map - the key of the map depends on the type of invoker, so for example event + * handlers are looked up by type, so the key of the map will be the type of event that the invoker handles. Command + * handlers though are looked up by command name, so the key of the map will be the name of the command that the + * the handler handles. + * + * This helper class provides shared functionality for achieving the above, including some shared parameter handlers, + * and the common logic for command invokers. The helper methods in here are used by the various service types + * annotation support classes. + */ private[impl] object ReflectionHelper { def getAllDeclaredMethods(clazz: Class[_]): Seq[Method] = @@ -51,19 +93,36 @@ private[impl] object ReflectionHelper { member.getName.charAt(0).toUpper + member.getName.drop(1) } else member.getName - final case class InvocationContext[+C <: Context](mainArgument: AnyRef, context: C) - trait ParameterHandler[-C <: Context] extends (InvocationContext[C] => AnyRef) - case object ContextParameterHandler extends ParameterHandler[Context] { - override def apply(ctx: InvocationContext[Context]): AnyRef = ctx.context.asInstanceOf[AnyRef] + final case class InvocationContext[M, +C <: Context](mainArgument: M, context: C) + trait ParameterHandler[M, -C <: Context] extends (InvocationContext[M, C] => AnyRef) + case object ContextParameterHandler extends ParameterHandler[Nothing, Context] { + override def apply(ctx: InvocationContext[Nothing, Context]): AnyRef = ctx.context.asInstanceOf[AnyRef] + } + final case class MainArgumentParameterHandler[M <: AnyRef, C <: Context](argumentType: Class[M]) + extends ParameterHandler[M, C] { + override def apply(ctx: InvocationContext[M, C]): AnyRef = ctx.mainArgument } - final case class MainArgumentParameterHandler[C <: Context](argumentType: Class[_]) extends ParameterHandler[C] { - override def apply(ctx: InvocationContext[C]): AnyRef = ctx.mainArgument + final case object EntityIdParameterHandler extends ParameterHandler[Nothing, EntityContext] { + override def apply(ctx: InvocationContext[Nothing, EntityContext]): AnyRef = ctx.context.entityId() } - final case object EntityIdParameterHandler extends ParameterHandler[EntityContext] { - override def apply(ctx: InvocationContext[EntityContext]): AnyRef = ctx.context.entityId() + final case object ServiceCallFactoryParameterHandler extends ParameterHandler[Nothing, Context] { + override def apply(ctx: InvocationContext[Nothing, Context]): AnyRef = ctx.context.serviceCallFactory() } - final case object ServiceCallFactoryParameterHandler extends ParameterHandler[Context] { - override def apply(ctx: InvocationContext[Context]): AnyRef = ctx.context.serviceCallFactory() + final case object MetadataParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + ctx.context.metadata + } + final case object CloudEventParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + ctx.context.metadata.asCloudEvent + } + final case object OptionalCloudEventParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + if (ctx.context.metadata.isCloudEvent) { + Optional.of(ctx.context.metadata.asCloudEvent) + } else { + Optional.empty() + } } final case class MethodParameter(method: Executable, param: Int) { @@ -75,14 +134,24 @@ private[impl] object ReflectionHelper { .find(a => implicitly[ClassTag[A]].runtimeClass.isInstance(a)) } - def getParameterHandlers[C <: Context: ClassTag](method: Executable)( - extras: PartialFunction[MethodParameter, ParameterHandler[C]] = PartialFunction.empty - ): Array[ParameterHandler[C]] = { - val handlers = Array.ofDim[ParameterHandler[_]](method.getParameterCount) + /** + * Determine the parameter handler for the given method. + * + * @param method The method (or constructor). + * @param extras A partial function for any additional argument handlers other than the default one. + * @tparam M The type of the main argument. + * @tparam C The context type for this method. + * @return An array of parameter handlers the same length as the number of parameters accepted by this method. + */ + def getParameterHandlers[M <: AnyRef, C <: Context: ClassTag](method: Executable)( + extras: PartialFunction[MethodParameter, ParameterHandler[M, C]] = PartialFunction.empty + ): Array[ParameterHandler[M, C]] = { + val handlers = Array.ofDim[ParameterHandler[_, _]](method.getParameterCount) + val contextClass = implicitly[ClassTag[C]].runtimeClass + val metadataContext = classOf[MetadataContext].isAssignableFrom(contextClass) for (i <- 0 until method.getParameterCount) { val parameter = MethodParameter(method, i) // First match things that we can be specific about - val contextClass = implicitly[ClassTag[C]].runtimeClass handlers(i) = if (isWithinBounds(parameter.parameterType, classOf[Context], contextClass)) ContextParameterHandler @@ -93,35 +162,52 @@ private[impl] object ReflectionHelper { ) else if (parameter.parameterType == classOf[ServiceCallFactory]) ServiceCallFactoryParameterHandler - else if (parameter.annotation[EntityId].isDefined) { + else if (parameter.annotation[EntityId].isDefined && classOf[EntityContext].isAssignableFrom(contextClass)) { if (parameter.parameterType != classOf[String]) { throw new RuntimeException( s"@EntityId annotated parameter on method ${method.getName} has type ${parameter.parameterType}, must be String." ) } EntityIdParameterHandler - } else - extras.applyOrElse(parameter, (p: MethodParameter) => MainArgumentParameterHandler(p.parameterType)) + } else if (metadataContext && parameter.parameterType == classOf[Metadata]) + MetadataParameterHandler + else if (metadataContext && parameter.parameterType == classOf[CloudEvent]) + CloudEventParameterHandler + else if (metadataContext && parameter.parameterType == classOf[Optional[_]] && + getFirstParameter(parameter.genericParameterType) == classOf[CloudEvent]) + OptionalCloudEventParameterHandler + else + extras.applyOrElse( + parameter, + (p: MethodParameter) => MainArgumentParameterHandler(p.parameterType.asInstanceOf[Class[M]]) + ) } - handlers.asInstanceOf[Array[ParameterHandler[C]]] + handlers.asInstanceOf[Array[ParameterHandler[M, C]]] } + def verifyAtMostOneMainArgument[M, C <: Context](name: String, + method: Method, + parameters: Array[ParameterHandler[M, C]]) = + if (parameters.count(_.isInstanceOf[MainArgumentParameterHandler[_, _]]) > 1) { + throw new RuntimeException( + s"$name method $method must defined at most one non context parameter to handle commands, the parameters defined were: ${parameters + .collect { case MainArgumentParameterHandler(clazz) => clazz.getName } + .mkString(",")}" + ) + } + final class CommandHandlerInvoker[CommandContext <: Context: ClassTag]( val method: Method, val serviceMethod: ResolvedServiceMethod[_, _], - extraParameters: PartialFunction[MethodParameter, ParameterHandler[CommandContext]] = PartialFunction.empty + extraParameters: PartialFunction[MethodParameter, ParameterHandler[AnyRef, CommandContext]] = + PartialFunction.empty ) { private val name = serviceMethod.descriptor.getFullName - private val parameters = ReflectionHelper.getParameterHandlers[CommandContext](method)(extraParameters) + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, CommandContext](method)(extraParameters) + + verifyAtMostOneMainArgument("CommandHandler", method, parameters) - if (parameters.count(_.isInstanceOf[MainArgumentParameterHandler[_]]) > 1) { - throw new RuntimeException( - s"CommandHandler method $method must defined at most one non context parameter to handle commands, the parameters defined were: ${parameters - .collect { case MainArgumentParameterHandler(clazz) => clazz.getName } - .mkString(",")}" - ) - } parameters.foreach { case MainArgumentParameterHandler(inClass) if !inClass.isAssignableFrom(serviceMethod.inputType.typeClass) => throw new RuntimeException( @@ -170,21 +256,36 @@ private[impl] object ReflectionHelper { } } - private def getRawType(t: Type): Class[_] = t match { + def getRawType(t: Type): Class[_] = t match { case clazz: Class[_] => clazz case pt: ParameterizedType => getRawType(pt.getRawType) case wct: WildcardType => getRawType(wct.getUpperBounds.headOption.getOrElse(classOf[Object])) case _ => classOf[Object] } - def getFirstParameter(t: Type): Class[_] = + /** + * Get the type of the first parameter of this parameterized type. + * + * If it's not a parameterized type, AnyRef is returned. + */ + def getGenericFirstParameter(t: Type): Type = t match { case pt: ParameterizedType => - getRawType(pt.getActualTypeArguments()(0)) + pt.getActualTypeArguments()(0) case _ => classOf[AnyRef] } + /** + * Get the class of the first parameter of this parameterized type. + * + * If it's not a parameterized type, AnyRef is returned. + * + * This is useful if, for example, you have a type who's raw type equals say java.util.Optional, + * and you want to find out what it's an optional of. + */ + def getFirstParameter(t: Type): Class[_] = getRawType(getGenericFirstParameter(t)) + /** * Verifies that none of the given methods have CloudState annotations that are not allowed. * @@ -201,7 +302,7 @@ private[impl] object ReflectionHelper { val maybeAlternative = allowed.find(_.getSimpleName == annotation.annotationType().getSimpleName) throw new RuntimeException( s"Annotation @${annotation.annotationType().getName} on method ${method.getDeclaringClass.getName}." + - s"${method.getName} not allowed in @${entity.getName} annotated entity." + + s"${method.getName} not allowed in @${entity.getName} annotated service." + maybeAlternative.fold("")(alterative => s" Did you mean to use @${alterative.getName}?") ) } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala index 99bd8598b..8eb7cabcb 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala @@ -16,9 +16,9 @@ package io.cloudstate.javasupport.impl -import io.cloudstate.javasupport.{ServiceCallFactory, ServiceCallRef, StatefulService} +import io.cloudstate.javasupport.{Service, ServiceCallFactory, ServiceCallRef} -class ResolvedServiceCallFactory(services: Map[String, StatefulService]) extends ServiceCallFactory { +class ResolvedServiceCallFactory(services: Map[String, Service]) extends ServiceCallFactory { override def lookup[T](serviceName: String, methodName: String, methodType: Class[T]): ServiceCallRef[T] = services.get(serviceName) match { case Some(service) => diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala index 26ed9fe30..8eb86e883 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala @@ -22,10 +22,10 @@ import com.google.protobuf.{ Descriptors, Parser, UnsafeByteOperations, - Message => JavaMessage, - Any => JavaPbAny + Any => JavaPbAny, + Message => JavaMessage } -import io.cloudstate.javasupport.{ServiceCall, ServiceCallRef} +import io.cloudstate.javasupport.{Metadata, ServiceCall, ServiceCallRef} /** * A resolved service method. @@ -40,16 +40,17 @@ final case class ResolvedServiceMethod[I, O](descriptor: Descriptors.MethodDescr override def method(): Descriptors.MethodDescriptor = descriptor - override def createCall(message: I): ServiceCall = + override def createCall(message: I, metadata: Metadata): ServiceCall = ResolvedServiceCall(this, JavaPbAny .newBuilder() .setTypeUrl(inputType.typeUrl) .setValue(inputType.toByteString(message)) - .build()) + .build(), + metadata) } -final case class ResolvedServiceCall(ref: ServiceCallRef[_], message: JavaPbAny) extends ServiceCall +final case class ResolvedServiceCall(ref: ServiceCallRef[_], message: JavaPbAny, metadata: Metadata) extends ServiceCall /** * A resolved type diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/ActionImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/ActionImpl.scala new file mode 100644 index 000000000..3e0b51465 --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/ActionImpl.scala @@ -0,0 +1,285 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.action + +import java.util + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Sink, Source} +import com.google.protobuf.any.{Any => ScalaPbAny} +import com.google.protobuf.{Descriptors, Any => JavaPbAny} +import io.cloudstate.javasupport.action._ +import io.cloudstate.javasupport.impl._ +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCall, ServiceCallFactory} +import io.cloudstate.protocol.entity.{Failure, Forward, Reply, SideEffect, Metadata => PbMetadata} +import io.cloudstate.protocol.function.{FunctionCommand, FunctionReply, StatelessFunction} + +import scala.concurrent.Future +import scala.compat.java8.FutureConverters._ +import scala.collection.JavaConverters._ + +final class ActionService(val actionHandler: ActionHandler, + override val descriptor: Descriptors.ServiceDescriptor, + val anySupport: AnySupport) + extends Service { + + override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = + actionHandler match { + case resolved: ResolvedEntityFactory => Some(resolved.resolvedMethods) + case _ => None + } + + override final val entityType = StatelessFunction.name +} + +final class StatelessFunctionImpl(_system: ActorSystem, services: Map[String, ActionService], rootContext: Context) + extends StatelessFunction { + + import _system.dispatcher + implicit val system: ActorSystem = _system + + private def toJavaPbAny(any: Option[ScalaPbAny]) = + any.fold(JavaPbAny.getDefaultInstance)(ScalaPbAny.toJavaProto) + + private def toOptionPbMetadata(metadata: Metadata) = + metadata match { + case impl: MetadataImpl if impl.entries.nonEmpty => + Some(PbMetadata(impl.entries)) + case _: MetadataImpl => None + case other => throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send") + } + + private def actionMessageToReply(msg: ActionReply[JavaPbAny]) = { + val response = msg match { + case message: MessageReply[JavaPbAny] => + FunctionReply.Response.Reply( + Reply( + Some(ScalaPbAny.fromJavaProto(message.payload())), + toOptionPbMetadata(message.metadata()) + ) + ) + case forward: ForwardReply[JavaPbAny] => + FunctionReply.Response.Forward( + Forward( + forward.serviceCall().ref().method().getService.getFullName, + forward.serviceCall().ref().method().getName, + Some(ScalaPbAny.fromJavaProto(forward.serviceCall().message())), + toOptionPbMetadata(forward.serviceCall().metadata()) + ) + ) + // ie, NoReply + case _ => FunctionReply.Response.Empty + } + + val effects = msg match { + case impl: ActionReplyImpl[_] => + impl._effects + case other => + other.effects().asScala.toList + } + val encodedEffects = effects.map { effect => + SideEffect( + effect.serviceCall().ref().method().getService.getFullName, + effect.serviceCall().ref().method().getName, + Some(ScalaPbAny.fromJavaProto(effect.serviceCall().message())), + effect.synchronous(), + toOptionPbMetadata(effect.serviceCall().metadata()) + ) + } + + FunctionReply(response, encodedEffects) + } + + /** + * Handle a unary command. + * The input command will contain the service name, command name, request metadata and the command + * payload. The reply may contain a direct reply, a forward or a failure, and it may contain many + * side effects. + */ + override def handleUnary(in: FunctionCommand): Future[FunctionReply] = + services.get(in.serviceName) match { + case Some(service) => + val context = createContext(in) + service.actionHandler + .handleUnary(in.name, MessageEnvelope.of(toJavaPbAny(in.payload), context.metadata()), context) + .toScala + .map(actionMessageToReply) + case None => + Future.successful( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + in.serviceName))) + ) + } + + /** + * Handle a streamed in command. + * The first message in will contain the request metadata, including the service name and command + * name. It will not have an associated payload set. This will be followed by zero to many messages + * in with a payload, but no service name or command name set. + * The semantics of stream closure in this protocol map 1:1 with the semantics of gRPC stream closure, + * that is, when the client closes the stream, the stream is considered half closed, and the server + * should eventually, but not necessarily immediately, send a response message with a status code and + * trailers. + * If however the server sends a response message before the client closes the stream, the stream is + * completely closed, and the client should handle this and stop sending more messages. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamedIn(in: Source[FunctionCommand, NotUsed]): Future[FunctionReply] = + in.prefixAndTail(1) + .runWith(Sink.head) + .flatMap { + case (Nil, _) => + Future.successful( + FunctionReply( + FunctionReply.Response.Failure( + Failure( + 0, + "Cloudstate protocol failure: expected command message with service name and command name, but got empty stream" + ) + ) + ) + ) + case (Seq(call), messages) => + services.get(call.serviceName) match { + case Some(service) => + service.actionHandler + .handleStreamedIn( + call.name, + messages.map { message => + val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil)) + MessageEnvelope.of(toJavaPbAny(message.payload), metadata) + }.asJava, + createContext(call) + ) + .toScala + .map(actionMessageToReply) + case None => + Future.successful( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + call.serviceName))) + ) + } + } + + /** + * Handle a streamed out command. + * The input command will contain the service name, command name, request metadata and the command + * payload. Zero or more replies may be sent, each containing either a direct reply, a forward or a + * failure, and each may contain many side effects. The stream to the client will be closed when the + * this stream is closed, with the same status as this stream is closed with. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamedOut(in: FunctionCommand): Source[FunctionReply, NotUsed] = + services.get(in.serviceName) match { + case Some(service) => + val context = createContext(in) + service.actionHandler + .handleStreamedOut(in.name, MessageEnvelope.of(toJavaPbAny(in.payload), context.metadata()), context) + .asScala + .map(actionMessageToReply) + case None => + Source.single(FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + in.serviceName)))) + } + + /** + * Handle a full duplex streamed command. + * The first message in will contain the request metadata, including the service name and command + * name. It will not have an associated payload set. This will be followed by zero to many messages + * in with a payload, but no service name or command name set. + * Zero or more replies may be sent, each containing either a direct reply, a forward or a failure, + * and each may contain many side effects. + * The semantics of stream closure in this protocol map 1:1 with the semantics of gRPC stream closure, + * that is, when the client closes the stream, the stream is considered half closed, and the server + * should eventually, but not necessarily immediately, close the streamage with a status code and + * trailers. + * If however the server closes the stream with a status code and trailers, the stream is immediately + * considered completely closed, and no further messages sent by the client will be handled by the + * server. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamed(in: Source[FunctionCommand, NotUsed]): Source[FunctionReply, NotUsed] = + in.prefixAndTail(1) + .flatMapConcat { + case (Nil, _) => + Source.single( + FunctionReply( + FunctionReply.Response.Failure( + Failure( + 0, + "Cloudstate protocol failure: expected command message with service name and command name, but got empty stream" + ) + ) + ) + ) + case (Seq(call), messages) => + services.get(call.serviceName) match { + case Some(service) => + service.actionHandler + .handleStreamed( + call.name, + messages.map { message => + val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil)) + MessageEnvelope.of(toJavaPbAny(message.payload), metadata) + }.asJava, + createContext(call) + ) + .asScala + .map(actionMessageToReply) + case None => + Source.single( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + call.serviceName))) + ) + } + } + + private def createContext(in: FunctionCommand): ActionContext = { + val metadata = new MetadataImpl(in.metadata.map(_.entries.toVector).getOrElse(Nil)) + new ActionContextImpl(metadata) + } + + class ActionContextImpl(override val metadata: Metadata) extends ActionContext { + override val serviceCallFactory: ServiceCallFactory = rootContext.serviceCallFactory() + } +} + +trait ActionReplyImpl[T] extends ActionReply[T] { + def _effects: List[Effect] + override def effects(): util.Collection[Effect] = _effects.asJava +} +case class MessageEnvelopeImpl[T](payload: T, metadata: Metadata) extends MessageEnvelope[T] +case class MessageReplyImpl[T](payload: T, metadata: Metadata, _effects: List[Effect]) + extends MessageReply[T] + with ActionReplyImpl[T] { + def this(payload: T, metadata: Metadata) = this(payload, metadata, Nil) + override def withEffects(effect: Effect*): MessageReply[T] = MessageReplyImpl(payload, metadata, _effects ++ effect) +} +case class ForwardReplyImpl[T](serviceCall: ServiceCall, _effects: List[Effect]) + extends ForwardReply[T] + with ActionReplyImpl[T] { + def this(serviceCall: ServiceCall) = this(serviceCall, Nil) + override def withEffects(effect: Effect*): ForwardReply[T] = ForwardReplyImpl(serviceCall, _effects ++ effect) +} +case class NoReply[T](_effects: List[Effect]) extends ActionReplyImpl[T] { + override def withEffects(effect: Effect*): ActionReply[T] = NoReply(_effects ++ effect) +} +object NoReply { + private val instance = NoReply[Any](Nil) + def apply[T]: ActionReply[T] = instance.asInstanceOf[NoReply[T]] +} +case class EffectImpl(serviceCall: ServiceCall, synchronous: Boolean) extends Effect diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupport.scala new file mode 100644 index 000000000..cad8b6352 --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupport.scala @@ -0,0 +1,478 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.action + +import java.lang.reflect.{InvocationTargetException, Method, Type} +import java.util.concurrent.{CompletableFuture, CompletionStage} + +import akka.NotUsed +import akka.stream.{javadsl, Materializer} +import akka.stream.javadsl.{AsPublisher, Source} +import akka.stream.scaladsl.{JavaFlowSupport, Sink} +import com.google.protobuf.{Descriptors, Any => JavaPbAny} +import io.cloudstate.javasupport.action._ +import io.cloudstate.javasupport.impl.ReflectionHelper.{InvocationContext, ParameterHandler} +import io.cloudstate.javasupport.impl.{ + AnySupport, + ReflectionHelper, + ResolvedEntityFactory, + ResolvedServiceMethod, + ResolvedType +} +import io.cloudstate.javasupport.Metadata + +/** + * Annotation based implementation of the [[ActionHandler]]. + */ +private[impl] class AnnotationBasedActionSupport( + action: AnyRef, + anySupport: AnySupport, + override val resolvedMethods: Map[String, ResolvedServiceMethod[_, _]] +)(implicit mat: Materializer) + extends ActionHandler + with ResolvedEntityFactory { + + def this(action: AnyRef, anySupport: AnySupport, serviceDescriptor: Descriptors.ServiceDescriptor)( + implicit mat: Materializer + ) = + this(action, anySupport, anySupport.resolveServiceDescriptor(serviceDescriptor)) + + private val behavior = ActionReflection(action.getClass, resolvedMethods) + + override def handleUnary(commandName: String, + message: MessageEnvelope[JavaPbAny], + context: ActionContext): CompletionStage[ActionReply[JavaPbAny]] = unwrap { + behavior.unaryHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(action, message, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${action.getClass.getName}" + ) + } + } + + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[JavaPbAny], + context: ActionContext): Source[ActionReply[JavaPbAny], NotUsed] = unwrap { + behavior.serverStreamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(action, message, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${action.getClass.getName}" + ) + } + } + + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ActionContext): CompletionStage[ActionReply[JavaPbAny]] = + behavior.clientStreamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(action, stream, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${action.getClass.getName}" + ) + } + + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ActionContext): Source[ActionReply[JavaPbAny], NotUsed] = + behavior.streamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(action, stream, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${action.getClass.getName}" + ) + } + + private def unwrap[T](block: => T): T = + try { + block + } catch { + case ite: InvocationTargetException if ite.getCause != null => + throw ite.getCause + } +} + +private class ActionReflection( + val unaryHandlers: Map[String, UnaryCallInvoker], + val serverStreamedHandlers: Map[String, ServerStreamedCallInvoker], + val clientStreamedHandlers: Map[String, ClientStreamedCallInvoker], + val streamedHandlers: Map[String, StreamedCallInvoker] +) + +private object ActionReflection { + def apply(behaviorClass: Class[_], serviceMethods: Map[String, ResolvedServiceMethod[_, _]])( + implicit mat: Materializer + ): ActionReflection = { + + val allMethods = ReflectionHelper.getAllDeclaredMethods(behaviorClass) + + // First, find all the call handler methods, and match them with corresponding service methods + val allCallHandlers = allMethods + .filter(_.getAnnotation(classOf[CallHandler]) != null) + .map { method => + method.setAccessible(true) + val annotation = method.getAnnotation(classOf[CallHandler]) + val name: String = if (annotation.name().isEmpty) { + ReflectionHelper.getCapitalizedName(method) + } else annotation.name() + + val serviceMethod = serviceMethods.getOrElse(name, { + throw new RuntimeException( + s"Command handler method ${method.getName} for command $name found, but the service has no command by that name." + ) + }) + + (method, serviceMethod) + } + .groupBy(_._2.name) + .map { + case (commandName, Seq((method, serviceMethod))) => (commandName, method, serviceMethod) + case (commandName, many) => + throw new RuntimeException( + s"Multiple methods found for handling command of name $commandName: ${many.map(_._1.getName).mkString(", ")}" + ) + } + + val unaryCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if !serviceMethod.descriptor.isClientStreaming && !serviceMethod.descriptor.isServerStreaming => + commandName -> new UnaryCallInvoker(method, serviceMethod) + }.toMap + + val serverStreamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if !serviceMethod.descriptor.isClientStreaming && serviceMethod.descriptor.isServerStreaming => + commandName -> new ServerStreamedCallInvoker(method, serviceMethod) + }.toMap + + val clientStreamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if serviceMethod.descriptor.isClientStreaming && !serviceMethod.descriptor.isServerStreaming => + commandName -> new ClientStreamedCallInvoker(method, serviceMethod, mat) + }.toMap + + val streamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if serviceMethod.descriptor.isClientStreaming && serviceMethod.descriptor.isServerStreaming => + commandName -> new StreamedCallInvoker(method, serviceMethod, mat) + }.toMap + + ReflectionHelper.validateNoBadMethods( + allMethods, + classOf[Action], + Set(classOf[CallHandler]) + ) + + new ActionReflection(unaryCallHandlers, + serverStreamedCallHandlers, + clientStreamedCallHandlers, + streamedCallHandlers) + } + + def getOutputParameterMapper[T](method: String, + resolvedType: ResolvedType[T], + returnType: Type): Any => ActionReply[JavaPbAny] = { + val (payloadClass, mapper) = ReflectionHelper.getRawType(returnType) match { + case envelope if envelope == classOf[MessageEnvelope[_]] => + val payload = ReflectionHelper.getFirstParameter(returnType) + (payload, { any: Any => + val envelope = any.asInstanceOf[MessageEnvelope[T]] + ActionReply.message(JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(envelope.payload)) + .setTypeUrl(resolvedType.typeUrl) + .build(), + envelope.metadata) + }) + case message if message == classOf[ActionReply[_]] => + val payload = ReflectionHelper.getFirstParameter(returnType) + (payload, { any: Any => + val message = any.asInstanceOf[ActionReply[T]] + message match { + case envelope: MessageReply[T] => + ActionReply.message(JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(envelope.payload)) + .setTypeUrl(resolvedType.typeUrl) + .build(), + envelope.metadata) + case other => other.asInstanceOf[ActionReply[JavaPbAny]] + } + }) + case payload => + (payload, { any: Any => + ActionReply.message( + JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(any.asInstanceOf[T])) + .setTypeUrl(resolvedType.typeUrl) + .build() + ) + }) + } + + if (payloadClass != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible return type $payloadClass for call $method, expected ${resolvedType.typeClass}" + ) + } + mapper + } + + def getInputParameterMapper(method: String, + resolvedType: ResolvedType[_], + parameterType: Type): MessageEnvelope[JavaPbAny] => AnyRef = + ReflectionHelper.getRawType(parameterType) match { + case envelope if envelope == classOf[MessageEnvelope[_]] => + val messageType = ReflectionHelper.getFirstParameter(parameterType) + if (messageType != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible message class $messageType for call $method, expected ${resolvedType.typeClass}" + ) + } else { envelope => + MessageEnvelope.of( + resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef], + envelope.metadata + ) + } + case payload => + if (payload != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible message class $payload for call $method, expected ${resolvedType.typeClass}" + ) + } else { envelope => + resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef] + } + } +} + +private class PayloadParameterHandler(mapper: MessageEnvelope[JavaPbAny] => AnyRef) + extends ParameterHandler[MessageEnvelope[JavaPbAny], ActionContext] { + override def apply(ctx: InvocationContext[MessageEnvelope[JavaPbAny], ActionContext]): AnyRef = + mapper(ctx.mainArgument) +} + +private class StreamedPayloadParameterHandler(mapper: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed] => AnyRef) + extends ParameterHandler[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ActionContext] { + override def apply( + ctx: InvocationContext[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ActionContext] + ): AnyRef = + mapper(ctx.mainArgument) +} + +private trait UnaryInSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val parameters: Array[ParameterHandler[MessageEnvelope[JavaPbAny], ActionContext]] = + ReflectionHelper.getParameterHandlers[MessageEnvelope[JavaPbAny], ActionContext](method) { + case payload => + new PayloadParameterHandler( + ActionReflection + .getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, payload.genericParameterType) + ) + } +} + +private trait UnaryOutSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val outputMapper: Any => CompletionStage[ActionReply[JavaPbAny]] = method.getReturnType match { + case cstage if cstage == classOf[CompletionStage[_]] => + val cstageType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper = + ActionReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, cstageType) + + any: Any => any.asInstanceOf[CompletionStage[Any]].thenApply(mapper.apply) + case _ => + val mapper = ActionReflection.getOutputParameterMapper(serviceMethod.name, + serviceMethod.outputType, + method.getGenericReturnType) + + any: Any => CompletableFuture.completedFuture(mapper(any)) + } +} + +private trait StreamedInSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + implicit protected val materializer: Materializer + + protected val parameters + : Array[ParameterHandler[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ActionContext]] = + ReflectionHelper.getParameterHandlers[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ActionContext]( + method + ) { + case source if source.parameterType == classOf[javadsl.Source[_, _]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(source.genericParameterType) + val mapper = + ActionReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, sourceType) + + new StreamedPayloadParameterHandler(source => source.map(mapper.apply)) + + case rsPublisher if rsPublisher.parameterType == classOf[org.reactivestreams.Publisher[_]] => + val publisherType = ReflectionHelper.getGenericFirstParameter(rsPublisher.genericParameterType) + val mapper = + ActionReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, publisherType) + + new StreamedPayloadParameterHandler( + source => + source.asScala + .map(mapper.apply) + .runWith(Sink.asPublisher(false)) + ) + + case jdkPublisher if jdkPublisher.parameterType == classOf[java.util.concurrent.Flow.Publisher[_]] => + val publisherType = ReflectionHelper.getGenericFirstParameter(jdkPublisher.genericParameterType) + val mapper = + ActionReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, publisherType) + + new StreamedPayloadParameterHandler( + source => + source.asScala + .map(mapper.apply) + .runWith(JavaFlowSupport.Sink.asPublisher(false)) + ) + + case other => + throw new RuntimeException( + s"Unknown input parameter of type $other. Streamed call ${serviceMethod.name} must accept a ${classOf[ + javadsl.Source[_, _] + ]} or ${classOf[org.reactivestreams.Publisher[_]]}." + ) + } + + if (parameters.count(_.isInstanceOf[StreamedPayloadParameterHandler]) != 1) { + throw new RuntimeException( + s"Streamed call ${serviceMethod.name} must accept exactly one parameter of type ${classOf[javadsl.Source[_, _]]} or ${classOf[org.reactivestreams.Publisher[_]]}" + ) + } +} + +private trait StreamedOutSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val outputMapper: Any => javadsl.Source[ActionReply[JavaPbAny], NotUsed] = method.getReturnType match { + case source if source == classOf[javadsl.Source[_, _]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ActionReply[JavaPbAny] = + ActionReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => + any + .asInstanceOf[javadsl.Source[Any, _]] + .map(mapper.apply) + .mapMaterializedValue(_ => NotUsed) + + case rsPublisher if rsPublisher == classOf[org.reactivestreams.Publisher[_]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ActionReply[JavaPbAny] = + ActionReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => { + javadsl.Source + .fromPublisher(any.asInstanceOf[org.reactivestreams.Publisher[Any]]) + .map(mapper.apply) + } + + case jdkPublisher if jdkPublisher == classOf[java.util.concurrent.Flow.Publisher[_]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ActionReply[JavaPbAny] = + ActionReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => { + JavaFlowSupport.Source + .fromPublisher(any.asInstanceOf[java.util.concurrent.Flow.Publisher[Any]]) + .map(mapper.apply) + .asJava + } + + case _ => + throw new RuntimeException( + s"Streamed call ${serviceMethod.name} must return a ${classOf[javadsl.Source[_, _]]} or ${classOf[org.reactivestreams.Publisher[_]]}." + ) + } +} + +private class UnaryCallInvoker(protected val method: Method, protected val serviceMethod: ResolvedServiceMethod[_, _]) + extends UnaryInSupport + with UnaryOutSupport { + + def invoke(action: AnyRef, + message: MessageEnvelope[JavaPbAny], + context: ActionContext): CompletionStage[ActionReply[JavaPbAny]] = { + val ctx = InvocationContext(message, context) + val result = method.invoke(action, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class ServerStreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _]) + extends UnaryInSupport + with StreamedOutSupport { + + def invoke(action: AnyRef, + message: MessageEnvelope[JavaPbAny], + context: ActionContext): javadsl.Source[ActionReply[JavaPbAny], NotUsed] = { + val ctx = InvocationContext(message, context) + val result = method.invoke(action, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class ClientStreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _], + protected val materializer: Materializer) + extends UnaryOutSupport + with StreamedInSupport { + + def invoke(action: AnyRef, + stream: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ActionContext): CompletionStage[ActionReply[JavaPbAny]] = { + val ctx = InvocationContext(stream, context) + val result = method.invoke(action, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class StreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _], + protected val materializer: Materializer) + extends StreamedOutSupport + with StreamedInSupport { + + def invoke(action: AnyRef, + stream: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ActionContext): javadsl.Source[ActionReply[JavaPbAny], NotUsed] = { + val ctx = InvocationContext(stream, context) + val result = method.invoke(action, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala index a99cbad80..6678c996f 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala @@ -21,9 +21,8 @@ import java.util.{function, Optional} import java.util.function.Consumer import scala.annotation.unchecked - import com.google.protobuf.{Descriptors, Any => JavaPbAny} -import io.cloudstate.javasupport.{Context, EntityFactory, ServiceCall, ServiceCallFactory} +import io.cloudstate.javasupport.{Context, EntityFactory, Metadata, ServiceCall, ServiceCallFactory} import io.cloudstate.javasupport.crdt.{ CommandContext, CommandHandler, @@ -210,7 +209,7 @@ private object CrdtAnnotationHelper { } def crdtParameterHandlers[C <: CrdtContext with CrdtFactory] - : PartialFunction[MethodParameter, ParameterHandler[C]] = { + : PartialFunction[MethodParameter, ParameterHandler[AnyRef, C]] = { case crdt if injectorMap.contains(crdt.parameterType) => new CrdtParameterHandler[C, Crdt, AnyRef](injectorMap(crdt.parameterType), crdt.method) case crdt @@ -224,8 +223,8 @@ private object CrdtAnnotationHelper { private class CrdtParameterHandler[C <: CrdtContext with CrdtFactory, D <: Crdt, T](injector: CrdtInjector[D, T], method: Executable) - extends ParameterHandler[C] { - override def apply(ctx: InvocationContext[C]): AnyRef = { + extends ParameterHandler[AnyRef, C] { + override def apply(ctx: InvocationContext[AnyRef, C]): AnyRef = { val state = ctx.context.state(injector.crdtClass) if (state.isPresent) { injector.wrap(state.get()).asInstanceOf[AnyRef] @@ -236,10 +235,10 @@ private object CrdtAnnotationHelper { } private class OptionalCrdtParameterHandler[C <: Crdt, T](injector: CrdtInjector[C, T], method: Executable) - extends ParameterHandler[CrdtContext] { + extends ParameterHandler[AnyRef, CrdtContext] { import scala.compat.java8.OptionConverters._ - override def apply(ctx: InvocationContext[CrdtContext]): AnyRef = + override def apply(ctx: InvocationContext[AnyRef, CrdtContext]): AnyRef = ctx.context.state(injector.crdtClass).asScala.map(injector.wrap).asJava } @@ -272,6 +271,7 @@ private final class AdaptedStreamedCommandContext(val delegate: StreamedCommandC override def entityId(): String = delegate.entityId() override def commandId(): Long = delegate.commandId() override def commandName(): String = delegate.commandName() + override def metadata(): Metadata = delegate.metadata() override def state[T <: Crdt](crdtClass: Class[T]): Optional[T] = delegate.state(crdtClass) override def delete(): Unit = delegate.delete() @@ -292,7 +292,9 @@ private final class AdaptedStreamedCommandContext(val delegate: StreamedCommandC private final class EntityConstructorInvoker(constructor: Constructor[_]) extends (CrdtCreationContext => AnyRef) { private val parameters = - ReflectionHelper.getParameterHandlers[CrdtCreationContext](constructor)(CrdtAnnotationHelper.crdtParameterHandlers) + ReflectionHelper.getParameterHandlers[AnyRef, CrdtCreationContext](constructor)( + CrdtAnnotationHelper.crdtParameterHandlers + ) parameters.foreach { case MainArgumentParameterHandler(clazz) => throw new RuntimeException(s"Don't know how to handle argument of type $clazz in constructor") @@ -300,7 +302,7 @@ private final class EntityConstructorInvoker(constructor: Constructor[_]) extend } def apply(context: CrdtCreationContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) constructor.newInstance(parameters.map(_.apply(ctx)): _*).asInstanceOf[AnyRef] } } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala index dd3ff2466..15d23f292 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala @@ -23,7 +23,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.{Flow, Source} import com.google.protobuf.Descriptors -import io.cloudstate.javasupport.{Context, ServiceCallFactory, StatefulService} +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCallFactory} import io.cloudstate.javasupport.crdt.{ CommandContext, CrdtContext, @@ -39,6 +39,7 @@ import io.cloudstate.javasupport.impl.{ ActivatableContext, AnySupport, FailInvoked, + MetadataImpl, ResolvedEntityFactory, ResolvedServiceMethod } @@ -48,13 +49,12 @@ import io.cloudstate.protocol.entity.{Command, Failure, StreamCancelled} import com.google.protobuf.any.{Any => ScalaPbAny} import com.google.protobuf.{Any => JavaPbAny} -import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ final class CrdtStatefulService(val factory: CrdtEntityFactory, override val descriptor: Descriptors.ServiceDescriptor, val anySupport: AnySupport) - extends StatefulService { + extends Service { override final val entityType = Crdt.name override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = @@ -135,7 +135,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], private var crdtIsNew = false private var subscribers = Map.empty[Long, function.Function[SubscriptionContext, Optional[JavaPbAny]]] - private var cancelListeners = Map.empty[Long, function.Consumer[StreamCancelledContext]] + private var cancelListeners = Map.empty[Long, (function.Consumer[StreamCancelledContext], Metadata)] private val entity = { val ctx = new CrdtCreationContext with CapturingCrdtFactory with ActivatableContext try { @@ -241,9 +241,9 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], def handleStreamCancelled(cancelled: StreamCancelled): List[CrdtStreamOut] = { subscribers -= cancelled.id cancelListeners.get(cancelled.id) match { - case Some(onCancel) => + case Some((onCancel, metadata)) => cancelListeners -= cancelled.id - val ctx = new CrdtStreamCancelledContext(cancelled) + val ctx = new CrdtStreamCancelledContext(cancelled, metadata) try { onCancel.accept(ctx) } finally { @@ -345,7 +345,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], subscribers = subscribers.updated(command.id, onChange) } cancelCallback.foreach { onCancel => - cancelListeners = cancelListeners.updated(command.id, onCancel) + cancelListeners = cancelListeners.updated(command.id, (onCancel, metadata)) } changeCallback.isDefined || cancelCallback.isDefined } @@ -363,9 +363,12 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], override final def commandId: Long = command.id override final def commandName(): String = command.name + + override val metadata: Metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + } - class CrdtStreamCancelledContext(cancelled: StreamCancelled) + class CrdtStreamCancelledContext(cancelled: StreamCancelled, override val metadata: Metadata) extends StreamCancelledContext with CapturingCrdtFactory with AbstractEffectContext diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala index 37bab1682..8767553fb 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala @@ -256,7 +256,8 @@ private object EventBehaviorReflection { private class EntityConstructorInvoker(constructor: Constructor[_]) extends (EventSourcedEntityCreationContext => AnyRef) { - private val parameters = ReflectionHelper.getParameterHandlers[EventSourcedEntityCreationContext](constructor)() + private val parameters = + ReflectionHelper.getParameterHandlers[AnyRef, EventSourcedEntityCreationContext](constructor)() parameters.foreach { case MainArgumentParameterHandler(clazz) => throw new RuntimeException(s"Don't know how to handle argument of type $clazz in constructor") @@ -264,7 +265,7 @@ private class EntityConstructorInvoker(constructor: Constructor[_]) } def apply(context: EventSourcedEntityCreationContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) constructor.newInstance(parameters.map(_.apply(ctx)): _*).asInstanceOf[AnyRef] } } @@ -273,7 +274,7 @@ private class EventHandlerInvoker(val method: Method) { private val annotation = method.getAnnotation(classOf[EventHandler]) - private val parameters = ReflectionHelper.getParameterHandlers[EventContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, EventContext](method)() private def annotationEventClass = annotation.eventClass() match { case obj if obj == classOf[Object] => None @@ -311,7 +312,7 @@ private class EventHandlerInvoker(val method: Method) { private class SnapshotHandlerInvoker(val method: Method) { private val annotation = method.getAnnotation(classOf[SnapshotHandler]) - private val parameters = ReflectionHelper.getParameterHandlers[SnapshotContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, SnapshotContext](method)() // Verify that there is at most one event handler val snapshotClass: Class[_] = parameters.collect { @@ -333,7 +334,7 @@ private class SnapshotHandlerInvoker(val method: Method) { private class SnapshotInvoker(val method: Method) { - private val parameters = ReflectionHelper.getParameterHandlers[SnapshotContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, SnapshotContext](method)() parameters.foreach { case MainArgumentParameterHandler(clazz) => @@ -344,7 +345,7 @@ private class SnapshotInvoker(val method: Method) { } def invoke(obj: AnyRef, context: SnapshotContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) method.invoke(obj, parameters.map(_.apply(ctx)): _*) } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala index 5187cb9be..eac575beb 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala @@ -25,7 +25,7 @@ import akka.stream.scaladsl.Flow import com.google.protobuf.{Descriptors, Any => JavaPbAny} import com.google.protobuf.any.{Any => ScalaPbAny} import io.cloudstate.javasupport.CloudStateRunner.Configuration -import io.cloudstate.javasupport.{Context, ServiceCallFactory, StatefulService} +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCallFactory} import io.cloudstate.javasupport.eventsourced._ import io.cloudstate.javasupport.impl.{ AbstractClientActionContext, @@ -33,6 +33,7 @@ import io.cloudstate.javasupport.impl.{ ActivatableContext, AnySupport, FailInvoked, + MetadataImpl, ResolvedEntityFactory, ResolvedServiceMethod } @@ -52,7 +53,7 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory, val anySupport: AnySupport, override val persistenceId: String, val snapshotEvery: Int) - extends StatefulService { + extends Service { override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = factory match { @@ -186,8 +187,9 @@ final class EventSourcedImpl(_system: ActorSystem, throw ProtocolException(command, "Receiving entity is not the intended recipient of command") val cmd = ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload"))) + val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil)) val context = - new CommandContextImpl(thisEntityId, sequence, command.name, command.id, service.anySupport, log) + new CommandContextImpl(thisEntityId, sequence, command.name, command.id, metadata, service.anySupport, log) val reply = try { handler.handleCommand(cmd, context) @@ -260,6 +262,7 @@ final class EventSourcedImpl(_system: ActorSystem, override val sequenceNumber: Long, override val commandName: String, override val commandId: Long, + override val metadata: Metadata, val anySupport: AnySupport, val log: LoggingAdapter) extends CommandContext diff --git a/java-support/src/test/proto/cloudstate/javasupport/actionspec.proto b/java-support/src/test/proto/cloudstate/javasupport/actionspec.proto new file mode 100644 index 000000000..82ddb9c6e --- /dev/null +++ b/java-support/src/test/proto/cloudstate/javasupport/actionspec.proto @@ -0,0 +1,32 @@ +// Copyright 2019 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package cloudstate.javasupport; + +message In { + string field = 1; +} + +message Out { + string field = 1; +} + +service ActionSpec { + rpc Unary(In) returns (Out); + rpc StreamedIn(stream In) returns (Out); + rpc StreamedOut(In) returns (stream Out); + rpc Streamed(stream In) returns (stream Out); +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/ActionServiceSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/ActionServiceSpec.scala new file mode 100644 index 000000000..f1cc7c6ea --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/ActionServiceSpec.scala @@ -0,0 +1,213 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.action + +import java.util.concurrent.{CompletableFuture, CompletionStage} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.javadsl.Source +import akka.stream.scaladsl.Sink +import cloudstate.javasupport.Actionspec +import cloudstate.javasupport.Actionspec.{In, Out} +import com.google.protobuf +import com.google.protobuf.any.{Any => ScalaPbAny} +import io.cloudstate.javasupport.{Context, ServiceCallFactory} +import io.cloudstate.javasupport.action.{ActionContext, ActionHandler, ActionReply, Effect, MessageEnvelope} +import io.cloudstate.javasupport.impl.{AnySupport, ResolvedServiceCallFactory} +import io.cloudstate.protocol.entity.{Forward, Reply} +import io.cloudstate.protocol.function.{FunctionCommand, FunctionReply, StatelessFunction} +import org.scalatest.{BeforeAndAfterAll, Inside, Matchers, OptionValues, WordSpec} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.compat.java8.FutureConverters._ + +class ActionServiceSpec extends WordSpec with Matchers with BeforeAndAfterAll with Inside with OptionValues { + + private implicit val system = ActorSystem("ActionServiceSpec") + + import system.dispatcher + + private val serviceDescriptor = + cloudstate.javasupport.Actionspec.getDescriptor.findServiceByName("ActionSpec") + private val serviceName = serviceDescriptor.getFullName + + override protected def afterAll(): Unit = { + super.afterAll() + system.terminate() + } + + def create(handler: ActionHandler): StatelessFunction = { + val service = new ActionService( + handler, + serviceDescriptor, + new AnySupport(Array(Actionspec.getDescriptor), this.getClass.getClassLoader) + ) + + val services = Map(serviceName -> service) + val scf = new ResolvedServiceCallFactory(services) + + new StatelessFunctionImpl(system, services, new Context() { + override def serviceCallFactory(): ServiceCallFactory = scf + }) + } + + "The action service" should { + "invoke unary commands" in { + val service = create(new AbstractHandler { + override def handleUnary(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ActionContext): CompletionStage[ActionReply[protobuf.Any]] = + CompletableFuture.completedFuture(createOutReply("out: " + extractInField(message))) + }) + + val reply = Await.result(service.handleUnary( + FunctionCommand(serviceName, "Unary", createInPayload("in")) + ), + 10.seconds) + + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===("out: in") + } + } + + "invoke streamed in commands" in { + val service = create(new AbstractHandler { + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ActionContext): CompletionStage[ActionReply[protobuf.Any]] = + stream.asScala + .map(extractInField) + .runWith(Sink.seq) + .map(ins => createOutReply("out: " + ins.mkString(", "))) + .toJava + }) + + val reply = Await.result( + service.handleStreamedIn( + akka.stream.scaladsl.Source + .single(FunctionCommand(serviceName, "StreamedIn")) + .concat( + akka.stream.scaladsl.Source(1 to 3).map(idx => FunctionCommand(payload = createInPayload(s"in $idx"))) + ) + ), + 10.seconds + ) + + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===("out: in 1, in 2, in 3") + } + } + + "invoke streamed out commands" in { + val service = create(new AbstractHandler { + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ActionContext): Source[ActionReply[protobuf.Any], NotUsed] = { + val in = extractInField(message) + akka.stream.scaladsl.Source(1 to 3).map(idx => createOutReply(s"out $idx: $in")).asJava + } + }) + + val replies = Await.result(service + .handleStreamedOut( + FunctionCommand(serviceName, "Unary", createInPayload("in")) + ) + .runWith(Sink.seq), + 10.seconds) + + replies.zipWithIndex.foreach { + case (reply, idx) => + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===(s"out ${idx + 1}: in") + } + } + } + + "invoke streamed commands" in { + val service = create(new AbstractHandler { + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ActionContext): Source[ActionReply[protobuf.Any], NotUsed] = + stream.asScala + .map(extractInField) + .map(in => createOutReply(s"out: $in")) + .asJava + }) + + val replies = Await.result( + service + .handleStreamed( + akka.stream.scaladsl.Source + .single(FunctionCommand(serviceName, "StreamedIn")) + .concat( + akka.stream.scaladsl.Source(1 to 3).map(idx => FunctionCommand(payload = createInPayload(s"in $idx"))) + ) + ) + .runWith(Sink.seq), + 10.seconds + ) + + replies.zipWithIndex.foreach { + case (reply, idx) => + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===(s"out: in ${idx + 1}") + } + } + } + + } + + private def createOutAny(field: String) = + protobuf.Any.pack(Out.newBuilder().setField(field).build()) + + private def createOutReply(field: String): ActionReply[protobuf.Any] = + ActionReply.message(createOutAny(field)) + + private def extractInField(message: MessageEnvelope[protobuf.Any]) = + message.payload().unpack(classOf[In]).getField + + private def createInPayload(field: String) = + Some(ScalaPbAny.fromJavaProto(protobuf.Any.pack(In.newBuilder().setField(field).build()))) + + private def extractOutField(payload: Option[ScalaPbAny]) = + ScalaPbAny.toJavaProto(payload.value).unpack(classOf[Out]).getField + + private trait AbstractHandler extends ActionHandler { + override def handleUnary(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ActionContext): CompletionStage[ActionReply[protobuf.Any]] = ??? + + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ActionContext): Source[ActionReply[protobuf.Any], NotUsed] = ??? + + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ActionContext): CompletionStage[ActionReply[protobuf.Any]] = ??? + + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ActionContext): Source[ActionReply[protobuf.Any], NotUsed] = ??? + } + +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupportSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupportSpec.scala new file mode 100644 index 000000000..37f0886b6 --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/action/AnnotationBasedActionSupportSpec.scala @@ -0,0 +1,415 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.action + +import java.util.Optional +import java.util.concurrent.{CompletableFuture, CompletionStage, TimeUnit} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.javadsl.Source +import akka.stream.scaladsl.{JavaFlowSupport, Sink} +import cloudstate.javasupport.Actionspec +import cloudstate.javasupport.Actionspec.{In, Out} +import com.google.protobuf +import io.cloudstate.javasupport.{Metadata, ServiceCallFactory} +import io.cloudstate.javasupport.action.{ + Action, + ActionContext, + ActionHandler, + ActionReply, + CallHandler, + MessageEnvelope, + MessageReply +} +import io.cloudstate.javasupport.impl.AnySupport +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.compat.java8.FutureConverters._ + +class AnnotationBasedActionSupportSpec extends WordSpec with Matchers with BeforeAndAfterAll { + + private implicit val sys = ActorSystem("AnnotationBasedActionSupportSpec") + + import sys.dispatcher + + override protected def afterAll(): Unit = { + super.afterAll() + sys.terminate() + } + + private val anySupport = new AnySupport(Array(Actionspec.getDescriptor), this.getClass.getClassLoader) + + private object ctx extends ActionContext { + override def metadata(): Metadata = Metadata.EMPTY.add("scope", "call") + + override def serviceCallFactory(): ServiceCallFactory = ??? + } + + private def create(handler: AnyRef): ActionHandler = + new AnnotationBasedActionSupport(handler, anySupport, Actionspec.getDescriptor.findServiceByName("ActionSpec")) + + "Annotation based action support" should { + + "support invoking unary commands" when { + def test(handler: AnyRef) = { + val reply = create(handler) + .handleUnary("Unary", createInEnvelope("in"), ctx) + .toCompletableFuture + .get(10, TimeUnit.SECONDS) + assertIsOutReplyWithField(reply, "out: in") + } + + def inToOut(in: In): Out = + Out.newBuilder().setField("out: " + in.getField).build() + + "synchronous" in test(new { + @CallHandler + def unary(in: In): Out = inToOut(in) + }) + + "asynchronous" in test(new { + @CallHandler + def unary(in: In): CompletionStage[Out] = CompletableFuture.completedFuture(inToOut(in)) + }) + + "in wrapped in envelope" in test(new { + @CallHandler + def unary(in: MessageEnvelope[In]): Out = { + in.metadata().get("scope") should ===(Optional.of("message")) + inToOut(in.payload()) + } + }) + + "synchronous out wrapped in envelope" in test(new { + @CallHandler + def unary(in: In): MessageEnvelope[Out] = MessageEnvelope.of(inToOut(in)) + }) + + "asynchronous out wrapped in envelope" in test(new { + @CallHandler + def unary(in: In): CompletionStage[MessageEnvelope[Out]] = + CompletableFuture.completedFuture(MessageEnvelope.of(inToOut(in))) + }) + + "synchronous out wrapped in reply" in test(new { + @CallHandler + def unary(in: In): ActionReply[Out] = ActionReply.message(inToOut(in)) + }) + + "asynchronous out wrapped in reply" in test(new { + @CallHandler + def unary(in: In): CompletionStage[ActionReply[Out]] = + CompletableFuture.completedFuture(ActionReply.message(inToOut(in))) + }) + + "with metadata parameter" in test(new { + @CallHandler + def unary(in: In, metadata: Metadata): Out = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in) + } + }) + + "with context parameter" in test(new { + @CallHandler + def unary(in: In, context: ActionContext): Out = inToOut(in) + }) + + } + + "support invoking streamed out commands" when { + def test(handler: AnyRef) = { + val replies = Await.result( + create(handler) + .handleStreamedOut("StreamedOut", createInEnvelope("in"), ctx) + .asScala + .runWith(Sink.seq), + 10.seconds + ) + replies should have size 3 + replies.zipWithIndex.foreach { + case (reply, idx) => + assertIsOutReplyWithField(reply, s"out ${idx + 1}: in") + } + } + + def inToOut(in: In): akka.stream.scaladsl.Source[Out, NotUsed] = + akka.stream.scaladsl + .Source(1 to 3) + .map { idx => + Out.newBuilder().setField(s"out $idx: " + in.getField).build() + } + + "source" in test(new { + @CallHandler + def streamedOut(in: In): Source[Out, NotUsed] = inToOut(in).asJava + }) + + "reactive streams publisher" in test(new { + @CallHandler + def streamedOut(in: In): org.reactivestreams.Publisher[Out] = + inToOut(in).runWith(Sink.asPublisher(false)) + }) + + "jdk publisher" in test(new { + @CallHandler + def streamedOut(in: In): java.util.concurrent.Flow.Publisher[Out] = + inToOut(in).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "message envelope" in test(new { + @CallHandler + def streamedOut(in: MessageEnvelope[In]): Source[Out, NotUsed] = inToOut(in.payload()).asJava + }) + + "source wrapped in envelope" in test(new { + @CallHandler + def streamedOut(in: In): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in).map(MessageEnvelope.of(_)).asJava + }) + + "source wrapped in reply" in test(new { + @CallHandler + def streamedOut(in: In): Source[ActionReply[Out], NotUsed] = + inToOut(in).map[ActionReply[Out]](ActionReply.message(_)).asJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamedOut(in: In, metadata: Metadata): Source[Out, NotUsed] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in).asJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamedOut(in: In, metadata: Metadata): Source[Out, NotUsed] = inToOut(in).asJava + }) + + } + + "support invoking streamed in commands" when { + def test(handler: AnyRef) = { + val reply = create(handler) + .handleStreamedIn( + "StreamedIn", + akka.stream.scaladsl + .Source(1 to 3) + .map(idx => createInEnvelope("in " + idx)) + .asJava, + ctx + ) + .toCompletableFuture + .get(10, TimeUnit.SECONDS) + + assertIsOutReplyWithField(reply, "out: in 1, in 2, in 3") + } + + def inToOut(in: akka.stream.scaladsl.Source[In, NotUsed]): Future[Out] = + in.runWith(Sink.seq).map { ins => + Out.newBuilder().setField("out: " + ins.map(_.getField).mkString(", ")).build() + } + + "source" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[Out] = inToOut(in.asScala).toJava + }) + + "reactive streams publisher" in test(new { + @CallHandler + def streamedIn(in: org.reactivestreams.Publisher[In]): CompletionStage[Out] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).toJava + }) + + "jdk publisher" in test(new { + @CallHandler + def streamedIn(in: java.util.concurrent.Flow.Publisher[In]): CompletionStage[Out] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).toJava + }) + + "source wrapped in envelope" in test(new { + @CallHandler + def streamedIn(in: Source[MessageEnvelope[In], NotUsed]): CompletionStage[Out] = + inToOut(in.asScala.map(_.payload)).toJava + }) + + "returns envelope" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[MessageEnvelope[Out]] = + inToOut(in.asScala).map(MessageEnvelope.of(_)).toJava + }) + + "returns reply" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[ActionReply[Out]] = + inToOut(in.asScala).map[ActionReply[Out]](ActionReply.message(_)).toJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed], metadata: Metadata): CompletionStage[Out] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in.asScala).toJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed], context: ActionContext): CompletionStage[Out] = + inToOut(in.asScala).toJava + }) + + } + + "support invoking streamed commands" when { + def test(handler: AnyRef) = { + val replies = Await.result( + create(handler) + .handleStreamed( + "Streamed", + akka.stream.scaladsl + .Source(1 to 3) + .map(idx => createInEnvelope("in " + idx)) + .asJava, + ctx + ) + .asScala + .runWith(Sink.seq), + 10.seconds + ) + + replies should have size 3 + replies.zipWithIndex.foreach { + case (reply, idx) => + assertIsOutReplyWithField(reply, s"out: in ${idx + 1}") + } + } + + def inToOut(stream: akka.stream.scaladsl.Source[In, NotUsed]): akka.stream.scaladsl.Source[Out, NotUsed] = + stream.map { in => + Out.newBuilder().setField("out: " + in.getField).build() + } + + "source in source out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[Out, NotUsed] = inToOut(in.asScala).asJava + }) + + "reactive streams publisher in source out" in test(new { + @CallHandler + def streamed(in: org.reactivestreams.Publisher[In]): Source[Out, NotUsed] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).asJava + }) + + "source in reactive streams publisher out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): org.reactivestreams.Publisher[Out] = + inToOut(in.asScala).runWith(Sink.asPublisher(false)) + }) + + "reactive streams publisher in reactive streams publisher out" in test(new { + @CallHandler + def streamed(in: org.reactivestreams.Publisher[In]): org.reactivestreams.Publisher[Out] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).runWith(Sink.asPublisher(false)) + }) + + "jdk publisher in source out" in test(new { + @CallHandler + def streamed(in: java.util.concurrent.Flow.Publisher[In]): Source[Out, NotUsed] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).asJava + }) + + "source in jdk publisher out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): java.util.concurrent.Flow.Publisher[Out] = + inToOut(in.asScala).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "jdk publisher in jdk publisher out" in test(new { + @CallHandler + def streamed(in: java.util.concurrent.Flow.Publisher[In]): java.util.concurrent.Flow.Publisher[Out] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "in wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[Out, NotUsed] = + inToOut(in.asScala.map(_.payload)).asJava + }) + + "out wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in.asScala).map(MessageEnvelope.of(_)).asJava + }) + + "in and out wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in.asScala.map(_.payload())).map(MessageEnvelope.of(_)).asJava + }) + + "out wrapped in reply" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[ActionReply[Out], NotUsed] = + inToOut(in.asScala).map[ActionReply[Out]](ActionReply.message(_)).asJava + }) + + "in wrapped in envelope out wrapped in reply" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[ActionReply[Out], NotUsed] = + inToOut(in.asScala.map(_.payload())).map[ActionReply[Out]](ActionReply.message(_)).asJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed], metadata: Metadata): Source[Out, NotUsed] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in.asScala).asJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed], context: ActionContext): Source[Out, NotUsed] = + inToOut(in.asScala).asJava + }) + + } + + } + + private def createInEnvelope(field: String) = + MessageEnvelope.of( + protobuf.Any.pack(In.newBuilder().setField(field).build()), + Metadata.EMPTY.add("scope", "message") + ) + + private def assertIsOutReplyWithField(reply: ActionReply[protobuf.Any], field: String) = + reply match { + case message: MessageReply[protobuf.Any] => + val out = message.payload().unpack(classOf[Out]) + out.getField should ===(field) + case other => + fail(s"$reply is not a MessageReply") + } + +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala index 79526df43..d52067f09 100644 --- a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala @@ -25,6 +25,7 @@ import io.cloudstate.javasupport.{ EntityContext, EntityFactory, EntityId, + Metadata, ServiceCall, ServiceCallFactory, ServiceCallRef @@ -57,6 +58,7 @@ class AnnotationBasedEventSourcedSupportSpec extends WordSpec with Matchers { override def fail(errorMessage: String): RuntimeException = ??? override def forward(to: ServiceCall): Unit = ??? override def effect(effect: ServiceCall, synchronous: Boolean): Unit = ??? + override def metadata(): Metadata = Metadata.EMPTY } val eventCtx = new EventContext with BaseContext {