diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java index 9febda8f9a..fdf58cd8a4 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java @@ -13,8 +13,10 @@ package io.dapr.actors.runtime; +import com.google.common.base.Strings; import io.dapr.actors.ActorId; import io.dapr.actors.ActorTrace; +import io.dapr.utils.RepeatedDuration; import reactor.core.publisher.Mono; import java.io.IOException; @@ -70,9 +72,9 @@ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) { this.actorRuntimeContext = runtimeContext; this.id = id; this.actorStateManager = new ActorStateManager( - runtimeContext.getStateProvider(), - runtimeContext.getActorTypeInformation().getName(), - id); + runtimeContext.getStateProvider(), + runtimeContext.getActorTypeInformation().getName(), + id); this.actorTrace = runtimeContext.getActorTrace(); this.started = false; } @@ -115,18 +117,56 @@ protected ActorStateManager getActorStateManager() { * @return Asynchronous void response. */ protected Mono registerReminder( - String reminderName, - T state, - Duration dueTime, - Duration period) { + String reminderName, + T state, + Duration dueTime, + Duration period) { + return this.registerReminder(reminderName, state, dueTime, new RepeatedDuration(period)); + } + + /** + * Registers a reminder for this Actor. + * + * @param reminderName Name of the reminder. + * @param state State to be send along with reminder triggers. + * @param dueTime Due time for the first trigger. + * @param period Frequency for the triggers. + * @param Type of the state object. + * @return Asynchronous void response. + */ + protected Mono registerReminder( + String reminderName, + T state, + Duration dueTime, + RepeatedDuration period) { + return this.registerReminder(reminderName, state, dueTime, period, null); + } + + /** + * Registers a reminder for this Actor. + * + * @param reminderName Name of the reminder. + * @param state State to be send along with reminder triggers. + * @param dueTime Due time for the first trigger. + * @param period Frequency for the triggers. + * @param ttl The time at which or time interval after which the reminder will be expired and deleted. + * @param Type of the state object. + * @return Asynchronous void response. + */ + protected Mono registerReminder( + String reminderName, + T state, + Duration dueTime, + RepeatedDuration period, + RepeatedDuration ttl) { try { byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state); - ActorReminderParams params = new ActorReminderParams(data, dueTime, period); + ActorReminderParams params = new ActorReminderParams(data, dueTime, period, ttl); return this.actorRuntimeContext.getDaprClient().registerReminder( - this.actorRuntimeContext.getActorTypeInformation().getName(), - this.id.toString(), - reminderName, - params); + this.actorRuntimeContext.getActorTypeInformation().getName(), + this.id.toString(), + reminderName, + params); } catch (IOException e) { return Mono.error(e); } @@ -147,23 +187,70 @@ protected Mono registerReminder( * @return Asynchronous result with timer's name. */ protected Mono registerActorTimer( - String timerName, - String callback, - T state, - Duration dueTime, - Duration period) { + String timerName, + String callback, + T state, + Duration dueTime, + Duration period) { + return registerActorTimer(timerName, callback, state, dueTime, new RepeatedDuration(period)); + } + + /** + * Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it. + * + * @param timerName Name of the timer, unique per Actor (auto-generated if null). + * @param callback Name of the method to be called. + * @param state State to be passed it to the method when timer triggers. + * @param dueTime The amount of time to delay before the async callback is first invoked. + * Specify negative one (-1) milliseconds to prevent the timer from starting. + * Specify zero (0) to start the timer immediately. + * @param period The time interval between invocations of the async callback. + * Specify negative one (-1) milliseconds to disable periodic signaling. + * @param Type for the state to be passed in to timer. + * @return Asynchronous result with timer's name. + */ + protected Mono registerActorTimer( + String timerName, + String callback, + T state, + Duration dueTime, + RepeatedDuration period) { + return registerActorTimer(timerName, callback, state, dueTime, period, null); + } + + /** + * Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it. + * + * @param timerName Name of the timer, unique per Actor (auto-generated if null). + * @param callback Name of the method to be called. + * @param state State to be passed it to the method when timer triggers. + * @param dueTime The amount of time to delay before the async callback is first invoked. + * Specify negative one (-1) milliseconds to prevent the timer from starting. + * Specify zero (0) to start the timer immediately. + * @param period The time interval between invocations of the async callback. + * Specify negative one (-1) milliseconds to disable periodic signaling. + * @param ttl The time at which or time interval after which the timer will be expired and deleted. + * @param Type for the state to be passed in to timer. + * @return Asynchronous result with timer's name. + */ + protected Mono registerActorTimer( + String timerName, + String callback, + T state, + Duration dueTime, + RepeatedDuration period, + RepeatedDuration ttl) { try { - if ((callback == null) || callback.isEmpty()) { + if (Strings.isNullOrEmpty(callback)) { throw new IllegalArgumentException("Timer requires a callback function."); } - String name = timerName; - if ((timerName == null) || (timerName.isEmpty())) { - name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString()); - } + String name = Strings.isNullOrEmpty(timerName) + ? String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID()) : timerName; byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state); - ActorTimerParams actorTimer = new ActorTimerParams(callback, data, dueTime, period); + ActorTimerParams actorTimer = + new ActorTimerParams(callback, data, dueTime, period, ttl); return this.actorRuntimeContext.getDaprClient().registerTimer( this.actorRuntimeContext.getActorTypeInformation().getName(), @@ -183,9 +270,9 @@ protected Mono registerActorTimer( */ protected Mono unregisterTimer(String timerName) { return this.actorRuntimeContext.getDaprClient().unregisterTimer( - this.actorRuntimeContext.getActorTypeInformation().getName(), - this.id.toString(), - timerName); + this.actorRuntimeContext.getActorTypeInformation().getName(), + this.id.toString(), + timerName); } /** @@ -196,9 +283,9 @@ protected Mono unregisterTimer(String timerName) { */ protected Mono unregisterReminder(String reminderName) { return this.actorRuntimeContext.getDaprClient().unregisterReminder( - this.actorRuntimeContext.getActorTypeInformation().getName(), - this.id.toString(), - reminderName); + this.actorRuntimeContext.getActorTypeInformation().getName(), + this.id.toString(), + reminderName); } /** @@ -277,8 +364,8 @@ Mono onActivateInternal() { this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Activating ..."); this.resetState(); }).then(this.onActivate()) - .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated")) - .then(this.saveState()); + .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated")) + .then(this.saveState()); } /** @@ -290,8 +377,8 @@ Mono onDeactivateInternal() { this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Deactivating ..."); return Mono.fromRunnable(() -> this.resetState()) - .then(this.onDeactivate()) - .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated")); + .then(this.onDeactivate()) + .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated")); } /** @@ -322,10 +409,10 @@ Mono onPostActorMethodInternal(ActorMethodContext actorMethodContext) { throw new IllegalStateException("Cannot complete a method before starting a call."); } }).then(this.onPostActorMethod(actorMethodContext)) - .then(this.saveState()) - .then(Mono.fromRunnable(() -> { - this.started = false; - })); + .then(this.saveState()) + .then(Mono.fromRunnable(() -> { + this.started = false; + })); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java index 82ee62009a..0143efd1ea 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.dapr.client.ObjectSerializer; import io.dapr.utils.DurationUtils; +import io.dapr.utils.RepeatedDuration; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -77,11 +78,19 @@ private byte[] serialize(ActorTimerParams timer) throws IOException { JsonGenerator generator = JSON_FACTORY.createGenerator(writer); generator.writeStartObject(); generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(timer.getDueTime())); - generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(timer.getPeriod())); + generator.writeStringField("period", + DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(timer.getPeriod())); generator.writeStringField("callback", timer.getCallback()); + + if (timer.getTtl().isPresent()) { + generator.writeStringField("ttl", + DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(timer.getTtl().get())); + } + if (timer.getData() != null) { generator.writeBinaryField("data", timer.getData()); } + generator.writeEndObject(); generator.close(); writer.flush(); @@ -101,10 +110,18 @@ private byte[] serialize(ActorReminderParams reminder) throws IOException { JsonGenerator generator = JSON_FACTORY.createGenerator(writer); generator.writeStartObject(); generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(reminder.getDueTime())); - generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(reminder.getPeriod())); + generator.writeStringField("period", + DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(reminder.getRepeatedPeriod())); + + if (reminder.getTtl().isPresent()) { + generator.writeStringField("ttl", + DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(reminder.getTtl().get())); + } + if (reminder.getData() != null) { generator.writeBinaryField("data", reminder.getData()); } + generator.writeEndObject(); generator.close(); writer.flush(); @@ -187,10 +204,11 @@ private ActorTimerParams deserializeActorTimer(byte[] value) throws IOException JsonNode node = OBJECT_MAPPER.readTree(value); String callback = node.get("callback").asText(); Duration dueTime = extractDurationOrNull(node, "dueTime"); - Duration period = extractDurationOrNull(node, "period"); + RepeatedDuration period = extractRepeatedDurationOrNull(node, "period"); + RepeatedDuration ttl = extractRepeatedDurationOrNull(node, "ttl"); byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null; - return new ActorTimerParams(callback, data, dueTime, period); + return new ActorTimerParams(callback, data, dueTime, period, ttl); } /** @@ -207,10 +225,11 @@ private ActorReminderParams deserializeActorReminder(byte[] value) throws IOExce JsonNode node = OBJECT_MAPPER.readTree(value); Duration dueTime = extractDurationOrNull(node, "dueTime"); - Duration period = extractDurationOrNull(node, "period"); + RepeatedDuration period = extractRepeatedDurationOrNull(node, "period"); + RepeatedDuration ttl = extractRepeatedDurationOrNull(node, "ttl"); byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null; - return new ActorReminderParams(data, dueTime, period); + return new ActorReminderParams(data, dueTime, period, ttl); } /** @@ -228,4 +247,20 @@ private static Duration extractDurationOrNull(JsonNode node, String name) { return DurationUtils.convertDurationFromDaprFormat(valueNode.asText()); } + + /** + * Extracts {@link RepeatedDuration} or null. + * + * @param node Node that contains the attribute. + * @param name Attribute name. + * @return Parsed {@link RepeatedDuration} or null. + */ + private static RepeatedDuration extractRepeatedDurationOrNull(JsonNode node, String name) { + JsonNode valueNode = node.get(name); + if (valueNode == null) { + return null; + } + + return DurationUtils.convertIso8601StringToRepeatedDuration(valueNode.asText()); + } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorReminderParams.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorReminderParams.java index 389457b7ad..2f62d0791c 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorReminderParams.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorReminderParams.java @@ -13,7 +13,10 @@ package io.dapr.actors.runtime; +import io.dapr.utils.RepeatedDuration; + import java.time.Duration; +import java.util.Optional; /** * Parameters for Actor Reminder. @@ -38,7 +41,13 @@ final class ActorReminderParams { /** * Interval between triggers. */ - private final Duration period; + private final RepeatedDuration period; + + /** + * Time at which or time interval after which the reminder will be expired and deleted. + * If ttl is omitted, no restrictions are applied. + */ + private final RepeatedDuration ttl; /** * Instantiates a new instance for the params of a reminder. @@ -46,13 +55,67 @@ final class ActorReminderParams { * @param data Data to be passed in as part of the reminder trigger. * @param dueTime Time the reminder is due for the 1st time. * @param period Interval between triggers. + * @deprecated As of release 1.4, replace with {@link #ActorReminderParams(byte[], Duration, RepeatedDuration)} */ + @Deprecated ActorReminderParams(byte[] data, Duration dueTime, Duration period) { + this(data, dueTime, new RepeatedDuration(period, null)); + } + + /** + * Instantiates a new instance for the params of a reminder. + * + * @param data Data to be passed in as part of the reminder trigger. + * @param dueTime Time the reminder is due for the 1st time. + * @param period Interval between triggers. + */ + ActorReminderParams(byte[] data, Duration dueTime, RepeatedDuration period) { + this(data, dueTime, period, null); + } + + /** + * Instantiates a new instance for the params of a reminder. + * + * @param data Data to be passed in as part of the reminder trigger. + * @param dueTime Time the reminder is due for the 1st time. + * @param period Interval between triggers. + * @param ttl Time at which or time interval after which the reminder will be expired and deleted. + */ + ActorReminderParams(byte[] data, Duration dueTime, RepeatedDuration period, RepeatedDuration ttl) { validateDueTime("DueTime", dueTime); - validatePeriod("Period", period); + validatePeriod("Period", period.getDuration()); this.data = data; this.dueTime = dueTime; this.period = period; + this.ttl = ttl; + } + + /** + * Validates due time is valid, throws {@link IllegalArgumentException}. + * + * @param argName Name of the argument passed in. + * @param value Vale being checked. + */ + private static void validateDueTime(String argName, Duration value) { + if (value.compareTo(Duration.ZERO) < 0) { + String message = String.format( + "argName: %s - Duration toMillis() - specified value must be greater than %s", argName, Duration.ZERO); + throw new IllegalArgumentException(message); + } + } + + /** + * Validates reminder period is valid, throws {@link IllegalArgumentException}. + * + * @param argName Name of the argument passed in. + * @param value Vale being checked. + */ + private static void validatePeriod(String argName, Duration value) throws IllegalArgumentException { + if (value.compareTo(MIN_TIME_PERIOD) < 0) { + String message = String.format( + "argName: %s - Duration toMillis() - specified value must be greater than %s", argName, MIN_TIME_PERIOD); + throw new IllegalArgumentException(message); + } } /** @@ -70,7 +133,7 @@ Duration getDueTime() { * @return Interval between triggers. */ Duration getPeriod() { - return period; + return period.getDuration(); } /** @@ -83,30 +146,21 @@ byte[] getData() { } /** - * Validates due time is valid, throws {@link IllegalArgumentException}. + * Gets the time at which or time interval after which the reminder will be expired and deleted. * - * @param argName Name of the argument passed in. - * @param value Vale being checked. + * @return Time at which or time interval after which the reminder will be expired and deleted. */ - private static void validateDueTime(String argName, Duration value) { - if (value.compareTo(Duration.ZERO) < 0) { - String message = String.format( - "argName: %s - Duration toMillis() - specified value must be greater than %s", argName, Duration.ZERO); - throw new IllegalArgumentException(message); - } + Optional getTtl() { + return Optional.ofNullable(ttl); } /** - * Validates reminder period is valid, throws {@link IllegalArgumentException}. + * Gets the periodic time when the reminder will be invoked. + * Possibly contains repetitions to limit the the total number of callback invocations. * - * @param argName Name of the argument passed in. - * @param value Vale being checked. + * @return Periodic time as {@link RepeatedDuration} when reminder will be invoked. */ - private static void validatePeriod(String argName, Duration value) throws IllegalArgumentException { - if (value.compareTo(MIN_TIME_PERIOD) < 0) { - String message = String.format( - "argName: %s - Duration toMillis() - specified value must be greater than %s", argName, MIN_TIME_PERIOD); - throw new IllegalArgumentException(message); - } + RepeatedDuration getRepeatedPeriod() { + return period; } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java index a3d1bb755a..5767d492d1 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java @@ -13,50 +13,94 @@ package io.dapr.actors.runtime; +import io.dapr.utils.RepeatedDuration; + import java.time.Duration; +import java.util.Optional; /** * Represents the timer set on an Actor, to be called once after due time and then every period. - * */ final class ActorTimerParams { /** * Name of the method to be called for this timer. */ - private String callback; + private final String callback; /** * State to be sent in the timer. */ - private byte[] data; + private final byte[] data; /** * Due time for the timer's first trigger. */ - private Duration dueTime; + private final Duration dueTime; /** * Period at which the timer will be triggered. + * The number of repetition can also be configured in order to limit the total number of callback invocations. + */ + private final RepeatedDuration period; + + /** + * Time at which or time interval after which the timer will be expired and deleted. + * If ttl is omitted, no restrictions are applied. */ - private Duration period; + private final RepeatedDuration ttl; /** * Instantiates a new Actor Timer. * - * @param callback The name of the method to be called for this timer. - * @param data The state to be used by the callback method - * @param dueTime The time when timer is first due. - * @param period The periodic time when timer will be invoked. + * @param callback The name of the method to be called for this timer. + * @param data The state to be used by the callback method + * @param dueTime The time when timer is first due. + * @param period The periodic time when timer will be invoked. + * @deprecated As of release 1.4, replace with {@link #ActorTimerParams(String, byte[], Duration, RepeatedDuration)} */ + @Deprecated ActorTimerParams(String callback, byte[] data, Duration dueTime, Duration period) { + this(callback, data, dueTime, new RepeatedDuration(period), null); + } + + /** + * Instantiates a new Actor Timer. + * + * @param callback The name of the method to be called for this timer. + * @param data Data to be passed in as part of the reminder trigger. + * @param dueTime Time the reminder is due for the 1st time. + * @param period Interval between triggers. + */ + ActorTimerParams(String callback, + byte[] data, + Duration dueTime, + RepeatedDuration period) { + this(callback, data, dueTime, period, null); + } + + /** + * Instantiates a new Actor Timer. + * + * @param callback The name of the method to be called for this timer. + * @param data Data to be passed in as part of the reminder trigger. + * @param dueTime Time the reminder is due for the 1st time. + * @param period Interval between triggers. + * @param ttl Time at which or time interval after which the reminder will be expired and deleted. + */ + ActorTimerParams(String callback, + byte[] data, + Duration dueTime, + RepeatedDuration period, + RepeatedDuration ttl) { this.callback = callback; this.data = data; this.dueTime = dueTime; this.period = period; + this.ttl = ttl; } /** @@ -64,7 +108,7 @@ final class ActorTimerParams { * * @return The name of the method for this timer. */ - public String getCallback() { + String getCallback() { return this.callback; } @@ -73,26 +117,36 @@ public String getCallback() { * * @return Time as Duration when timer is first due. */ - public Duration getDueTime() { + Duration getDueTime() { return this.dueTime; } /** * Gets the periodic time when timer will be invoked. + * Possibly contains repetitions to limit the the total number of callback invocations. * - * @return Periodic time as Duration when timer will be invoked. + * @return Periodic time as {@link RepeatedDuration} when timer will be invoked. */ - public Duration getPeriod() { + RepeatedDuration getPeriod() { return this.period; } /** - * Gets state containing information to be used by the callback method, or null. + * Gets state containing information to be used by the callback method, or null. * - * @return State containing information to be used by the callback method, or null. + * @return State containing information to be used by the callback method, or null. */ - public byte[] getData() { + byte[] getData() { return this.data; } + /** + * Gets the time at which or time interval after which the timer will be expired and deleted. + * This is an optional field and may return null. + * + * @return Time at which or time interval after which the timer will be expired and deleted, or null. + */ + Optional getTtl() { + return Optional.ofNullable(this.ttl); + } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java index 6a95fb3f00..1574752ea9 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java @@ -19,7 +19,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.dapr.config.Properties; -import io.dapr.utils.DurationUtils; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.ManagedChannel; @@ -30,6 +29,9 @@ import java.util.ArrayList; import java.util.List; +import static io.dapr.utils.DurationUtils.convertDurationToDaprFormat; +import static io.dapr.utils.DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat; + /** * A DaprClient over HTTP for Actor's runtime. */ @@ -148,17 +150,18 @@ public Mono registerReminder( String reminderName, ActorReminderParams reminderParams) { return Mono.fromCallable(() -> { - DaprProtos.RegisterActorReminderRequest req = - DaprProtos.RegisterActorReminderRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(reminderName) - .setData(ByteString.copyFrom(reminderParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) - .build(); - - ListenableFuture futureResponse = client.registerActorReminder(req); + DaprProtos.RegisterActorReminderRequest.Builder reqBuilder = DaprProtos.RegisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .setData(ByteString.copyFrom(reminderParams.getData())) + .setDueTime(convertDurationToDaprFormat(reminderParams.getDueTime())) + .setPeriod(convertRepeatedDurationToIso8601RepetitionFormat(reminderParams.getRepeatedPeriod())); + + reminderParams.getTtl() + .ifPresent(value -> reqBuilder.setTtl(convertRepeatedDurationToIso8601RepetitionFormat(value))); + + ListenableFuture futureResponse = client.registerActorReminder(reqBuilder.build()); futureResponse.get(); return null; }); @@ -193,18 +196,19 @@ public Mono registerTimer( String timerName, ActorTimerParams timerParams) { return Mono.fromCallable(() -> { - DaprProtos.RegisterActorTimerRequest req = - DaprProtos.RegisterActorTimerRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(timerName) - .setCallback(timerParams.getCallback()) - .setData(ByteString.copyFrom(timerParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) - .build(); - - ListenableFuture futureResponse = client.registerActorTimer(req); + DaprProtos.RegisterActorTimerRequest.Builder reqBuilder = DaprProtos.RegisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .setCallback(timerParams.getCallback()) + .setData(ByteString.copyFrom(timerParams.getData())) + .setDueTime(convertDurationToDaprFormat(timerParams.getDueTime())) + .setPeriod(convertRepeatedDurationToIso8601RepetitionFormat(timerParams.getPeriod())); + + timerParams.getTtl() + .ifPresent(value -> reqBuilder.setTtl(convertRepeatedDurationToIso8601RepetitionFormat(value))); + + ListenableFuture futureResponse = client.registerActorTimer(reqBuilder.build()); futureResponse.get(); return null; }); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerParamsTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerParamsTest.java new file mode 100644 index 0000000000..da98e9751e --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerParamsTest.java @@ -0,0 +1,42 @@ +package io.dapr.actors.runtime; + +import io.dapr.utils.RepeatedDuration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; + +public class ActorTimerParamsTest { + private static final ActorObjectSerializer SERIALIZER = new ActorObjectSerializer(); + + @Test + public void ttlShouldBeOptional() throws IOException { + // Arrange + ActorTimerParams params = + new ActorTimerParams("callback", new byte[1], Duration.ZERO, new RepeatedDuration(Duration.ZERO, 1)); + + // Act + byte[] serialized = SERIALIZER.serialize(params); + ActorTimerParams deserialized = SERIALIZER.deserialize(serialized, ActorTimerParams.class); + + // Assert + Assert.assertFalse(deserialized.getTtl().isPresent()); + } + + @Test + public void ttlRepetitionsNotRequired() throws IOException { + // Arrange + ActorTimerParams params = + new ActorTimerParams("callback", new byte[1], Duration.ZERO, new RepeatedDuration(Duration.ZERO), + new RepeatedDuration(Duration.ZERO)); + + // Act + byte[] serialized = SERIALIZER.serialize(params); + ActorTimerParams deserialized = SERIALIZER.deserialize(serialized, ActorTimerParams.class); + + // Assert + Assert.assertTrue(deserialized.getTtl().isPresent()); + Assert.assertFalse(deserialized.getTtl().get().getRepetitions().isPresent()); + } +} diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java index 00320e0782..2c4a630f03 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java @@ -14,6 +14,7 @@ package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.utils.RepeatedDuration; import org.junit.Assert; import org.junit.Test; @@ -34,14 +35,19 @@ public void serialize() throws IOException { .plusHours(1) .plusSeconds(3); + Duration ttl = Duration.ZERO + .plusHours(4) + .plusSeconds(2); + ActorTimerParams timer = new ActorTimerParams( "myfunction", null, dueTime, - period); + new RepeatedDuration(period), + new RepeatedDuration(ttl, 4)); byte[] s = new ActorObjectSerializer().serialize(timer); - String expected = "{\"period\":\"1h0m3s0ms\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\"}"; + String expected = "{\"period\":\"PT1H3S\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\", \"ttl\": \"R4/PT4H2S\"}"; // Deep comparison via JsonNode.equals method. Assert.assertEquals(OBJECT_MAPPER.readTree(expected), OBJECT_MAPPER.readTree(s)); } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java index 6b362b9841..f73ea833af 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -19,6 +19,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.dapr.utils.DurationUtils; +import io.dapr.utils.RepeatedDuration; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import org.junit.Before; @@ -158,7 +159,8 @@ public void registerActorReminder() { ActorReminderParams params = new ActorReminderParams( "hello world".getBytes(), Duration.ofSeconds(1), - Duration.ofSeconds(2) + new RepeatedDuration(Duration.ofSeconds(2)), + new RepeatedDuration(Duration.ofSeconds(2), 1) ); when(grpcStub.registerActorReminder(argThat(argument -> { @@ -166,7 +168,8 @@ public void registerActorReminder() { assertEquals(ACTOR_ID, argument.getActorId()); assertEquals(reminderName, argument.getName()); assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); + assertEquals(DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(params.getRepeatedPeriod()), argument.getPeriod()); + assertEquals(DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(params.getTtl().get()), argument.getTtl()); return true; }))).thenReturn(settableFuture); Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params); @@ -201,7 +204,8 @@ public void registerActorTimer() { callback, "hello world".getBytes(), Duration.ofSeconds(1), - Duration.ofSeconds(2) + new RepeatedDuration(Duration.ofSeconds(2)), + new RepeatedDuration(Duration.ofSeconds(2), 4) ); when(grpcStub.registerActorTimer(argThat(argument -> { @@ -210,7 +214,8 @@ public void registerActorTimer() { assertEquals(timerName, argument.getName()); assertEquals(callback, argument.getCallback()); assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); + assertEquals(DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(params.getPeriod()), argument.getPeriod()); + assertEquals(DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(params.getTtl().get()), argument.getTtl()); return true; }))).thenReturn(settableFuture); Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, timerName, params); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java index 8ef2f976cf..d62e847622 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java @@ -115,7 +115,7 @@ public void registerActorTimer() { @Override public Response.Builder respond(Request request) { String expectedBody = "{\"dueTime\":\"0h0m5s0ms\"," + - "\"period\":\"0h0m10s0ms\"," + + "\"period\":\"PT10S\"," + "\"callback\":\"mycallback\"," + "\"data\":\""+ Base64.getEncoder().encodeToString(data.getBytes()) +"\"}"; String body = ""; diff --git a/sdk/src/main/java/io/dapr/utils/DurationUtils.java b/sdk/src/main/java/io/dapr/utils/DurationUtils.java index 9a4614b3f2..bcadf3d9a6 100644 --- a/sdk/src/main/java/io/dapr/utils/DurationUtils.java +++ b/sdk/src/main/java/io/dapr/utils/DurationUtils.java @@ -13,9 +13,14 @@ package io.dapr.utils; +import com.google.common.base.Strings; + import java.time.Duration; -public class DurationUtils { +public final class DurationUtils { + + private DurationUtils() { + } /** * Converts time from the String format used by Dapr into a Duration. @@ -90,7 +95,63 @@ public static String convertDurationToDaprFormat(Duration value) { } /** - * Helper to get the "days" part of the Duration. For example if the duration is 26 hours, this returns 1. + * This method uses the default {@link Duration#toString()} method that supports the ISO-8601 standard. + * In addition to the default implementation, this method allows for repetitions as well. + * + * @param repeatedDuration {@link RepeatedDuration} to parse to ISO-8601 format. + * @return String containing the parsed {@link RepeatedDuration} to the ISO-8601 format, possibly with repetitions. + * Negative duration results in an empty string, meaning fire only once. + */ + public static String convertRepeatedDurationToIso8601RepetitionFormat(RepeatedDuration repeatedDuration) { + StringBuilder sb = new StringBuilder(); + + if (repeatedDuration.getDuration().isNegative()) { + // Negative duration results in fire only once. + return sb.toString(); + } + + repeatedDuration.getRepetitions() + .ifPresent(value -> sb.append(String.format("R%d/", value))); + + // Duration.ToString() returns the ISO-8601 representation of the duration. + sb.append(repeatedDuration.getDuration().toString()); + + return sb.toString(); + } + + /** + * This method uses the {@link Duration#parse(CharSequence)} method that supports parsing of an ISO-8601 string. + * In addition to the default implementation, this method allows for repetitions as well as the Dapr format. + * Example inputs: 'R4/PT2H', 'P3DT2H', '4h15m50s60ms' + * + * @param value The value in ISO-8601 format to convert to a {@link RepeatedDuration}. + * @return {@link RepeatedDuration} containing the duration and possible repetitions. + */ + public static RepeatedDuration convertIso8601StringToRepeatedDuration(String value) { + if (Strings.isNullOrEmpty(value)) { + throw new IllegalArgumentException("Value can not be empty"); + } + + String[] splitOnRepetition = value.split("/"); + + if (splitOnRepetition.length == 1 && splitOnRepetition[0].charAt(0) == 'P') { + return new RepeatedDuration(Duration.parse(value)); + } else if (splitOnRepetition.length == 1) { + return new RepeatedDuration(DurationUtils.convertDurationFromDaprFormat(value)); + } + + if (splitOnRepetition[0].charAt(0) != 'R') { + throw new IllegalArgumentException(String.format("Value: '%s' does not follow the ISO-8601 standard", value)); + } + + Integer repetitions = Integer.parseInt(splitOnRepetition[0].substring(1)); + Duration parsedDuration = Duration.parse(splitOnRepetition[1]); + + return new RepeatedDuration(parsedDuration, repetitions); + } + + /** + * Helper to get the "days" part of the Duration. For example if the duration is 26 hours, this returns 1. * * @param d Duration * @return Number of days. diff --git a/sdk/src/main/java/io/dapr/utils/RepeatedDuration.java b/sdk/src/main/java/io/dapr/utils/RepeatedDuration.java new file mode 100644 index 0000000000..5ddd31b022 --- /dev/null +++ b/sdk/src/main/java/io/dapr/utils/RepeatedDuration.java @@ -0,0 +1,104 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.utils; + +import java.time.Duration; +import java.util.Optional; + +/** + * Represents a duration with an optional amount of repetitions. + */ +public final class RepeatedDuration { + + /** + * The minimum amount of repetitions. + */ + private static final Integer MIN_AMOUNT_REPETITIONS = 1; + + /** + * The duration. + */ + private final Duration duration; + + /** + * The amount of times the duration will be repeated. + * This is an optional field. + */ + private final Integer repetitions; + + /** + * Instantiates a new instance of a repeated duration. + * + * @param duration The interval until an action. + */ + public RepeatedDuration(Duration duration) { + this(duration, null); + } + + /** + * Instantiates a new instance for a repeated duration. + * + * @param duration The interval until an action. + * @param repetitions The amount of times to invoke the action. + */ + public RepeatedDuration(Duration duration, Integer repetitions) { + validateDuration(duration); + validateRepetitions(repetitions); + this.duration = duration; + this.repetitions = repetitions; + } + + /** + * Validates the duration. + * + * @param duration The duration to validate. + */ + private static void validateDuration(Duration duration) { + if (duration == null) { + throw new IllegalArgumentException("Duration can not be null."); + } + } + + /** + * Validates the repetitions of TTL and Period. + * + * @param repetitions The amount of repetitions checked. + */ + private static void validateRepetitions(Integer repetitions) { + if (repetitions != null && repetitions < MIN_AMOUNT_REPETITIONS) { + String message = String.format( + "argName: Amount of repetitions - specified value must be greater than %s", + MIN_AMOUNT_REPETITIONS); + throw new IllegalArgumentException(message); + } + } + + /** + * Gets the {@link Duration}. + * + * @return The {@link Duration}. + */ + public Duration getDuration() { + return duration; + } + + /** + * Gets the amount of repetitions. + * + * @return The amount of repetitions as {@link Optional}. + */ + public Optional getRepetitions() { + return Optional.ofNullable(repetitions); + } +} diff --git a/sdk/src/test/java/io/dapr/utils/DurationUtilsIso8601Test.java b/sdk/src/test/java/io/dapr/utils/DurationUtilsIso8601Test.java new file mode 100644 index 0000000000..1329aa464e --- /dev/null +++ b/sdk/src/test/java/io/dapr/utils/DurationUtilsIso8601Test.java @@ -0,0 +1,105 @@ +package io.dapr.utils; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class DurationUtilsIso8601Test { + + @Test + public void convertRepeatedDurationToIso8601StringWithRepetitions() { + // Arrange + RepeatedDuration repeatedDuration = new RepeatedDuration(Duration.ofHours(4), 2); + + // Act + String result = DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(repeatedDuration); + + // Assert + Assert.assertEquals("R2/PT4H", result); + } + + @Test + public void convertRepeatedDurationToIso8601StringWithoutRepetitions() { + // Arrange + RepeatedDuration repeatedDuration = new RepeatedDuration(Duration.ofHours(4)); + + // Act + String result = DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(repeatedDuration); + + // Assert + Assert.assertEquals("PT4H", result); + } + + @Test + public void convertIso8601StringToRepeatedDurationWithRepetitions() { + // Arrange + RepeatedDuration expected = new RepeatedDuration(Duration.ofDays(1).plusMinutes(4), 5); + String value = "R5/P1DT4M"; + + // Act + RepeatedDuration result = DurationUtils.convertIso8601StringToRepeatedDuration(value); + + // Assert + Assert.assertEquals(expected.getDuration(), result.getDuration()); + Assert.assertEquals(expected.getRepetitions().get(), result.getRepetitions().get()); + } + + @Test + public void convertIso8601StringToRepeatedDurationWithoutRepetitions() { + // Arrange + RepeatedDuration expected = new RepeatedDuration(Duration.ofDays(1).plusMinutes(4)); + String value = "P1DT4M"; + + // Act + RepeatedDuration result = DurationUtils.convertIso8601StringToRepeatedDuration(value); + + // Assert + Assert.assertEquals(expected.getDuration(), result.getDuration()); + Assert.assertFalse(result.getRepetitions().isPresent()); + } + + @Test + public void convertDaprFormatStringToRepeatedDuration() { + // Arrange + String daprFormatDuration = "4h15m50s60ms"; + Duration expectedDuration = Duration.ofHours(4).plusMinutes(15).plusSeconds(50).plusMillis(60); + + // Act + RepeatedDuration result = DurationUtils.convertIso8601StringToRepeatedDuration(daprFormatDuration); + + // Assert + Assert.assertEquals(expectedDuration, result.getDuration()); + } + + @Test(expected = IllegalArgumentException.class) + public void convertWithInvalidRepetitionSyntax() { + // Arrange + String input = "Z4/PT4S"; + + // Act + DurationUtils.convertIso8601StringToRepeatedDuration(input); + } + + @Test(expected = IllegalArgumentException.class) + public void nullInputResultsInIllegalArgumentException() { + DurationUtils.convertIso8601StringToRepeatedDuration(""); + } + + @Test(expected = IllegalArgumentException.class) + public void emptyInputResultsInIllegalArgumentException() { + DurationUtils.convertIso8601StringToRepeatedDuration(null); + } + + @Test + public void negativeDurationShouldReturnEmptyString() { + // Arrange + RepeatedDuration input = new RepeatedDuration(Duration.ZERO.minusMinutes(1)); + + // Act + String result = DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(input); + + // Assert + Assert.assertEquals("", result); + } +} diff --git a/sdk/src/test/java/io/dapr/utils/RepeatedDurationTest.java b/sdk/src/test/java/io/dapr/utils/RepeatedDurationTest.java new file mode 100644 index 0000000000..6b48a55f04 --- /dev/null +++ b/sdk/src/test/java/io/dapr/utils/RepeatedDurationTest.java @@ -0,0 +1,24 @@ +package io.dapr.utils; + +import org.junit.Test; + +import java.time.Duration; + + +public class RepeatedDurationTest { + + @Test(expected = IllegalArgumentException.class) + public void invalidAmountOfRepetitions() { + new RepeatedDuration(Duration.ZERO, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void repetitionsMustBeGreaterThanZero() { + new RepeatedDuration(Duration.ZERO, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void durationCanNotBeNull() { + new RepeatedDuration(null); + } +}