Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 124 additions & 37 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

package io.dapr.actors.runtime;

import com.google.common.base.Strings;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import io.dapr.utils.RepeatedDuration;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand Down Expand Up @@ -70,9 +72,9 @@ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) {
this.actorRuntimeContext = runtimeContext;
this.id = id;
this.actorStateManager = new ActorStateManager(
runtimeContext.getStateProvider(),
runtimeContext.getActorTypeInformation().getName(),
id);
runtimeContext.getStateProvider(),
runtimeContext.getActorTypeInformation().getName(),
id);
this.actorTrace = runtimeContext.getActorTrace();
this.started = false;
}
Expand Down Expand Up @@ -115,18 +117,56 @@ protected ActorStateManager getActorStateManager() {
* @return Asynchronous void response.
*/
protected <T> Mono<Void> registerReminder(
String reminderName,
T state,
Duration dueTime,
Duration period) {
String reminderName,
T state,
Duration dueTime,
Duration period) {
return this.registerReminder(reminderName, state, dueTime, new RepeatedDuration(period));
}

/**
* Registers a reminder for this Actor.
*
* @param reminderName Name of the reminder.
* @param state State to be send along with reminder triggers.
* @param dueTime Due time for the first trigger.
* @param period Frequency for the triggers.
* @param <T> Type of the state object.
* @return Asynchronous void response.
*/
protected <T> Mono<Void> registerReminder(
String reminderName,
T state,
Duration dueTime,
RepeatedDuration period) {
return this.registerReminder(reminderName, state, dueTime, period, null);
}

/**
* Registers a reminder for this Actor.
*
* @param reminderName Name of the reminder.
* @param state State to be send along with reminder triggers.
* @param dueTime Due time for the first trigger.
* @param period Frequency for the triggers.
* @param ttl The time at which or time interval after which the reminder will be expired and deleted.
* @param <T> Type of the state object.
* @return Asynchronous void response.
*/
protected <T> Mono<Void> registerReminder(
String reminderName,
T state,
Duration dueTime,
RepeatedDuration period,
RepeatedDuration ttl) {
try {
byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state);
ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
ActorReminderParams params = new ActorReminderParams(data, dueTime, period, ttl);
return this.actorRuntimeContext.getDaprClient().registerReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName,
params);
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName,
params);
} catch (IOException e) {
return Mono.error(e);
}
Expand All @@ -147,23 +187,70 @@ protected <T> Mono<Void> registerReminder(
* @return Asynchronous result with timer's name.
*/
protected <T> Mono<String> registerActorTimer(
String timerName,
String callback,
T state,
Duration dueTime,
Duration period) {
String timerName,
String callback,
T state,
Duration dueTime,
Duration period) {
return registerActorTimer(timerName, callback, state, dueTime, new RepeatedDuration(period));
}

/**
* Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
*
* @param timerName Name of the timer, unique per Actor (auto-generated if null).
* @param callback Name of the method to be called.
* @param state State to be passed it to the method when timer triggers.
* @param dueTime The amount of time to delay before the async callback is first invoked.
* Specify negative one (-1) milliseconds to prevent the timer from starting.
* Specify zero (0) to start the timer immediately.
* @param period The time interval between invocations of the async callback.
* Specify negative one (-1) milliseconds to disable periodic signaling.
* @param <T> Type for the state to be passed in to timer.
* @return Asynchronous result with timer's name.
*/
protected <T> Mono<String> registerActorTimer(
String timerName,
String callback,
T state,
Duration dueTime,
RepeatedDuration period) {
return registerActorTimer(timerName, callback, state, dueTime, period, null);
}

/**
* Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
*
* @param timerName Name of the timer, unique per Actor (auto-generated if null).
* @param callback Name of the method to be called.
* @param state State to be passed it to the method when timer triggers.
* @param dueTime The amount of time to delay before the async callback is first invoked.
* Specify negative one (-1) milliseconds to prevent the timer from starting.
* Specify zero (0) to start the timer immediately.
* @param period The time interval between invocations of the async callback.
* Specify negative one (-1) milliseconds to disable periodic signaling.
* @param ttl The time at which or time interval after which the timer will be expired and deleted.
* @param <T> Type for the state to be passed in to timer.
* @return Asynchronous result with timer's name.
*/
protected <T> Mono<String> registerActorTimer(
String timerName,
String callback,
T state,
Duration dueTime,
RepeatedDuration period,
RepeatedDuration ttl) {
try {
if ((callback == null) || callback.isEmpty()) {
if (Strings.isNullOrEmpty(callback)) {
throw new IllegalArgumentException("Timer requires a callback function.");
}

String name = timerName;
if ((timerName == null) || (timerName.isEmpty())) {
name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString());
}
String name = Strings.isNullOrEmpty(timerName)
? String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID()) : timerName;

byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state);
ActorTimerParams actorTimer = new ActorTimerParams(callback, data, dueTime, period);
ActorTimerParams actorTimer =
new ActorTimerParams(callback, data, dueTime, period, ttl);

return this.actorRuntimeContext.getDaprClient().registerTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
Expand All @@ -183,9 +270,9 @@ protected <T> Mono<String> registerActorTimer(
*/
protected Mono<Void> unregisterTimer(String timerName) {
return this.actorRuntimeContext.getDaprClient().unregisterTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
timerName);
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
timerName);
}

/**
Expand All @@ -196,9 +283,9 @@ protected Mono<Void> unregisterTimer(String timerName) {
*/
protected Mono<Void> unregisterReminder(String reminderName) {
return this.actorRuntimeContext.getDaprClient().unregisterReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName);
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName);
}

/**
Expand Down Expand Up @@ -277,8 +364,8 @@ Mono<Void> onActivateInternal() {
this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Activating ...");
this.resetState();
}).then(this.onActivate())
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated"))
.then(this.saveState());
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated"))
.then(this.saveState());
}

/**
Expand All @@ -290,8 +377,8 @@ Mono<Void> onDeactivateInternal() {
this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Deactivating ...");

return Mono.fromRunnable(() -> this.resetState())
.then(this.onDeactivate())
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated"));
.then(this.onDeactivate())
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated"));
}

/**
Expand Down Expand Up @@ -322,10 +409,10 @@ Mono<Void> onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
throw new IllegalStateException("Cannot complete a method before starting a call.");
}
}).then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> {
this.started = false;
}));
.then(this.saveState())
.then(Mono.fromRunnable(() -> {
this.started = false;
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.dapr.client.ObjectSerializer;
import io.dapr.utils.DurationUtils;
import io.dapr.utils.RepeatedDuration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -77,11 +78,19 @@ private byte[] serialize(ActorTimerParams timer) throws IOException {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(timer.getDueTime()));
generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(timer.getPeriod()));
generator.writeStringField("period",
DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(timer.getPeriod()));
generator.writeStringField("callback", timer.getCallback());

if (timer.getTtl().isPresent()) {
generator.writeStringField("ttl",
DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(timer.getTtl().get()));
}

if (timer.getData() != null) {
generator.writeBinaryField("data", timer.getData());
}

generator.writeEndObject();
generator.close();
writer.flush();
Expand All @@ -101,10 +110,18 @@ private byte[] serialize(ActorReminderParams reminder) throws IOException {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(reminder.getDueTime()));
generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(reminder.getPeriod()));
generator.writeStringField("period",
DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(reminder.getRepeatedPeriod()));

if (reminder.getTtl().isPresent()) {
generator.writeStringField("ttl",
DurationUtils.convertRepeatedDurationToIso8601RepetitionFormat(reminder.getTtl().get()));
}

if (reminder.getData() != null) {
generator.writeBinaryField("data", reminder.getData());
}

generator.writeEndObject();
generator.close();
writer.flush();
Expand Down Expand Up @@ -187,10 +204,11 @@ private ActorTimerParams deserializeActorTimer(byte[] value) throws IOException
JsonNode node = OBJECT_MAPPER.readTree(value);
String callback = node.get("callback").asText();
Duration dueTime = extractDurationOrNull(node, "dueTime");
Duration period = extractDurationOrNull(node, "period");
RepeatedDuration period = extractRepeatedDurationOrNull(node, "period");
RepeatedDuration ttl = extractRepeatedDurationOrNull(node, "ttl");
byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null;

return new ActorTimerParams(callback, data, dueTime, period);
return new ActorTimerParams(callback, data, dueTime, period, ttl);
}

/**
Expand All @@ -207,10 +225,11 @@ private ActorReminderParams deserializeActorReminder(byte[] value) throws IOExce

JsonNode node = OBJECT_MAPPER.readTree(value);
Duration dueTime = extractDurationOrNull(node, "dueTime");
Duration period = extractDurationOrNull(node, "period");
RepeatedDuration period = extractRepeatedDurationOrNull(node, "period");
RepeatedDuration ttl = extractRepeatedDurationOrNull(node, "ttl");
byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null;

return new ActorReminderParams(data, dueTime, period);
return new ActorReminderParams(data, dueTime, period, ttl);
}

/**
Expand All @@ -228,4 +247,20 @@ private static Duration extractDurationOrNull(JsonNode node, String name) {

return DurationUtils.convertDurationFromDaprFormat(valueNode.asText());
}

/**
* Extracts {@link RepeatedDuration} or null.
*
* @param node Node that contains the attribute.
* @param name Attribute name.
* @return Parsed {@link RepeatedDuration} or null.
*/
private static RepeatedDuration extractRepeatedDurationOrNull(JsonNode node, String name) {
JsonNode valueNode = node.get(name);
if (valueNode == null) {
return null;
}

return DurationUtils.convertIso8601StringToRepeatedDuration(valueNode.asText());
}
}
Loading