Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@
package org.apache.samza.operators;

/**
* Allows registering epoch-time timer callbacks from the operators.
* See {@link org.apache.samza.operators.functions.TimerFunction} for details.
* @param <K> type of the timer key
* Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
* @param <K> type of the key to schedule
*/
public interface TimerRegistry<K> {

public interface Scheduler<K> {
/**
* Register a epoch-time timer with key.
* @param key unique timer key
* @param timestamp epoch time when the timer will be fired, in milliseconds
* Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
* @param key unique key associated with the callback to schedule
* @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
*/
void register(K key, long timestamp);
void schedule(K key, long timestamp);

/**
* Delete the timer for the provided key.
* @param key key for the timer to delete
* Delete the scheduled callback for the provided {@code key}.
* @param key key to delete
*/
void delete(K key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators.functions;

import org.apache.samza.operators.Scheduler;

import java.util.Collection;


/**
* Allows scheduling a callback for a specific epoch-time.
* Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
* corresponding schedule time occurs.
*
* <p>
* Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
* <pre>{@code
* public class ExampleScheduledFn implements FlatMapFunction<String, String>, ScheduledFunction<String, String> {
* // for recurring callbacks, keep track of the scheduler from "schedule"
* private Scheduler scheduler;
*
* public void schedule(Scheduler scheduler) {
* // save the scheduler for recurring callbacks
* this.scheduler = scheduler;
* long time = System.currentTimeMillis() + 5000; // fire after 5 sec
* scheduler.schedule("do-delayed-logic", time);
* }
* public Collection<String> apply(String s) {
* ...
* }
* public Collection<String> onCallback(String key, long timestamp) {
* // do some logic for key "do-delayed-logic"
* ...
* // for recurring callbacks, call the saved scheduler again
* this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
* }
* }
* }</pre>
* @param <K> type of the key
* @param <OM> type of the output
*/
public interface ScheduledFunction<K, OM> {
/**
* Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks.
* @param scheduler used to specify the schedule time(s) and key(s)
*/
void schedule(Scheduler<K> scheduler);

/**
* Returns the output from the scheduling logic corresponding to the key that was triggered.
* @param key key corresponding to the callback that got invoked
* @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch
* @return {@link Collection} of output elements
*/
Collection<OM> onCallback(K key, long timestamp);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
* under the License.
*/

package org.apache.samza.task;
package org.apache.samza.scheduler;

import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;


/**
* The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
* @param <K> type of the timer key
* The callback that is invoked when its corresponding schedule time registered via
* {@link org.apache.samza.task.TaskContext} is reached.
* @param <K> type of the callback key
*/
public interface TimerCallback<K> {
public interface ScheduledCallback<K> {
/**
* Invoked when the timer of key fires.
* @param key timer key
* Invoked when the corresponding schedule time is reached.
* @param key key for callback
* @param collector contains the means of sending message envelopes to the output stream.
* @param coordinator manages execution of tasks.
*/
void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
}
19 changes: 10 additions & 9 deletions samza-api/src/main/java/org/apache/samza/task/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.scheduler.ScheduledCallback;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.Table;

Expand Down Expand Up @@ -76,21 +77,21 @@ default Object getUserContext() {
}

/**
* Register a keyed timer with a callback of {@link TimerCallback} in this task.
* Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
* The callback will be invoked exclusively with any other operations for this task,
* e.g. processing, windowing and commit.
* @param key timer key
* @param timestamp epoch time when the timer will be fired, in milliseconds
* @param callback callback when the timer is fired
* @param key key for the callback
* @param timestamp epoch time when the callback will be fired, in milliseconds
* @param callback callback to call when the {@code timestamp} is reached
* @param <K> type of the key
*/
<K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
<K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);

/**
* Delete the keyed timer in this task.
* Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
* @param key timer key
* Delete the scheduled {@code callback} for the {@code key}.
* Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
* @param key callback key
* @param <K> type of the key
*/
<K> void deleteTimer(K key);
<K> void deleteScheduledCallback(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableManager;
import org.apache.samza.task.SystemTimerScheduler;
import org.apache.samza.task.EpochTimeScheduler;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TimerCallback;
import org.apache.samza.scheduler.ScheduledCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext {
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
private final Map<String, Object> objectRegistry = new HashMap<>();
private final SystemTimerScheduler timerScheduler;
private final EpochTimeScheduler timerScheduler;

private Object userContext = null;

Expand All @@ -76,7 +76,7 @@ public TaskContextImpl(TaskName taskName,
this.tableManager = tableManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
}

@Override
Expand Down Expand Up @@ -134,12 +134,12 @@ public Object getUserContext() {
}

@Override
public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
timerScheduler.setTimer(key, timestamp, callback);
}

@Override
public <K> void deleteTimer(K key) {
public <K> void deleteScheduledCallback(K key) {
timerScheduler.deleteTimer(key);
}

Expand All @@ -159,7 +159,7 @@ public StreamMetadataCache getStreamMetadataCache() {
return streamMetadataCache;
}

public SystemTimerScheduler getTimerScheduler() {
public EpochTimeScheduler getTimerScheduler() {
return timerScheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.operators.TimerRegistry;
import org.apache.samza.operators.functions.TimerFunction;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.metrics.Counter;
Expand Down Expand Up @@ -436,19 +436,19 @@ final long getOutputWatermark() {

/**
* Returns a registry which allows registering arbitrary system-clock timer with K-typed key.
* The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)}
* The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)}
* for timer notifications.
* @param <K> key type for the timer.
* @return an instance of {@link TimerRegistry}
* @return an instance of {@link Scheduler}
*/
<K> TimerRegistry<K> createOperatorTimerRegistry() {
return new TimerRegistry<K>() {
<K> Scheduler<K> createOperatorScheduler() {
return new Scheduler<K>() {
@Override
public void register(K key, long time) {
taskContext.registerTimer(key, time, (k, collector, coordinator) -> {
final TimerFunction<K, RM> timerFn = getOperatorSpec().getTimerFn();
if (timerFn != null) {
final Collection<RM> output = timerFn.onTimer(key, time);
public void schedule(K key, long time) {
taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> {
final ScheduledFunction<K, RM> scheduledFn = getOperatorSpec().getScheduledFn();
if (scheduledFn != null) {
final Collection<RM> output = scheduledFn.onCallback(key, time);

if (!output.isEmpty()) {
output.forEach(rm ->
Expand All @@ -457,15 +457,15 @@ public void register(K key, long time) {
}
} else {
throw new SamzaException(
String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.",
String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.",
getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation()));
}
});
}

@Override
public void delete(K key) {
taskContext.deleteTimer(key);
taskContext.deleteScheduledCallback(key);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TimerRegistry;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.util.TimestampedValue;
Expand Down Expand Up @@ -170,9 +170,9 @@ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec
operatorImpl.init(config, context);
operatorImpl.registerInputStream(inputStream);

if (operatorSpec.getTimerFn() != null) {
final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry();
operatorSpec.getTimerFn().registerTimer(timerRegistry);
if (operatorSpec.getScheduledFn() != null) {
final Scheduler scheduler = operatorImpl.createOperatorScheduler();
operatorSpec.getScheduledFn().schedule(scheduler);
}

// Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Collection<WindowPane<K, Object>> handleMessage(

@Override
public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
LOG.trace("Processing timer.");
LOG.trace("Processing time triggers");
List<WindowPane<K, Object>> results = new ArrayList<>();
List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();

Expand Down
Loading