diff --git a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java similarity index 61% rename from samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java rename to samza-api/src/main/java/org/apache/samza/operators/Scheduler.java index 64dd4ec96d..77148f0a92 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java @@ -20,22 +20,20 @@ package org.apache.samza.operators; /** - * Allows registering epoch-time timer callbacks from the operators. - * See {@link org.apache.samza.operators.functions.TimerFunction} for details. - * @param type of the timer key + * Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later. + * @param type of the key to schedule */ -public interface TimerRegistry { - +public interface Scheduler { /** - * Register a epoch-time timer with key. - * @param key unique timer key - * @param timestamp epoch time when the timer will be fired, in milliseconds + * Schedule a callback for the {@code key} to be invoked at {@code timestamp}. + * @param key unique key associated with the callback to schedule + * @param timestamp epoch time when the callback for the key will be invoked, in milliseconds */ - void register(K key, long timestamp); + void schedule(K key, long timestamp); /** - * Delete the timer for the provided key. - * @param key key for the timer to delete + * Delete the scheduled callback for the provided {@code key}. + * @param key key to delete */ void delete(K key); } diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java new file mode 100644 index 0000000000..952948c515 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.functions; + +import org.apache.samza.operators.Scheduler; + +import java.util.Collection; + + +/** + * Allows scheduling a callback for a specific epoch-time. + * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the + * corresponding schedule time occurs. + * + *

+ * Example of a {@link FlatMapFunction} with {@link ScheduledFunction}: + *

{@code
+ *    public class ExampleScheduledFn implements FlatMapFunction, ScheduledFunction {
+ *      // for recurring callbacks, keep track of the scheduler from "schedule"
+ *      private Scheduler scheduler;
+ *
+ *      public void schedule(Scheduler scheduler) {
+ *        // save the scheduler for recurring callbacks
+ *        this.scheduler = scheduler;
+ *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
+ *        scheduler.schedule("do-delayed-logic", time);
+ *      }
+ *      public Collection apply(String s) {
+ *        ...
+ *      }
+ *      public Collection onCallback(String key, long timestamp) {
+ *        // do some logic for key "do-delayed-logic"
+ *        ...
+ *        // for recurring callbacks, call the saved scheduler again
+ *        this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
+ *      }
+ *    }
+ * }
+ * @param type of the key + * @param type of the output + */ +public interface ScheduledFunction { + /** + * Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks. + * @param scheduler used to specify the schedule time(s) and key(s) + */ + void schedule(Scheduler scheduler); + + /** + * Returns the output from the scheduling logic corresponding to the key that was triggered. + * @param key key corresponding to the callback that got invoked + * @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch + * @return {@link Collection} of output elements + */ + Collection onCallback(K key, long timestamp); +} diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java deleted file mode 100644 index 01825c6479..0000000000 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.functions; - -import org.apache.samza.operators.TimerRegistry; - -import java.util.Collection; - -/** - * Allows timer registration with a key and is invoked when the timer is fired. - * Key must be a unique identifier for this timer, and is provided in the callback when the timer fires. - * - *

- * Example of a {@link FlatMapFunction} with timer: - *

{@code
- *    public class ExampleTimerFn implements FlatMapFunction, TimerFunction {
- *      public void registerTimer(TimerRegistry timerRegistry) {
- *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        timerRegistry.register("example-timer", time);
- *      }
- *      public Collection apply(String s) {
- *        ...
- *      }
- *      public Collection onTimer(String key, long timestamp) {
- *        // example-timer fired
- *        ...
- *      }
- *    }
- * }
- * @param type of the key - * @param type of the output - */ -public interface TimerFunction { - - /** - * Registers any epoch-time timers using the registry - * @param timerRegistry a keyed {@link TimerRegistry} - */ - void registerTimer(TimerRegistry timerRegistry); - - /** - * Returns the output after the timer with key fires. - * @param key timer key - * @param timestamp time of the epoch-time timer fired, in milliseconds - * @return {@link Collection} of output elements - */ - Collection onTimer(K key, long timestamp); -} diff --git a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java similarity index 64% rename from samza-api/src/main/java/org/apache/samza/task/TimerCallback.java rename to samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java index 3add129c63..87454228a5 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java +++ b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java @@ -17,18 +17,23 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.scheduler; + +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + /** - * The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires. - * @param type of the timer key + * The callback that is invoked when its corresponding schedule time registered via + * {@link org.apache.samza.task.TaskContext} is reached. + * @param type of the callback key */ -public interface TimerCallback { +public interface ScheduledCallback { /** - * Invoked when the timer of key fires. - * @param key timer key + * Invoked when the corresponding schedule time is reached. + * @param key key for callback * @param collector contains the means of sending message envelopes to the output stream. * @param coordinator manages execution of tasks. */ - void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator); + void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator); } diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index ea2a3bca98..eccedba3f2 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -24,6 +24,7 @@ import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.scheduler.ScheduledCallback; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; @@ -76,21 +77,21 @@ default Object getUserContext() { } /** - * Register a keyed timer with a callback of {@link TimerCallback} in this task. + * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}. * The callback will be invoked exclusively with any other operations for this task, * e.g. processing, windowing and commit. - * @param key timer key - * @param timestamp epoch time when the timer will be fired, in milliseconds - * @param callback callback when the timer is fired + * @param key key for the callback + * @param timestamp epoch time when the callback will be fired, in milliseconds + * @param callback callback to call when the {@code timestamp} is reached * @param type of the key */ - void registerTimer(K key, long timestamp, TimerCallback callback); + void scheduleCallback(K key, long timestamp, ScheduledCallback callback); /** - * Delete the keyed timer in this task. - * Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt. - * @param key timer key + * Delete the scheduled {@code callback} for the {@code key}. + * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt. + * @param key callback key * @param type of the key */ - void deleteTimer(K key); + void deleteScheduledCallback(K key); } diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index d65be4cbbb..bea6373baf 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -29,9 +29,9 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; import org.apache.samza.table.TableManager; -import org.apache.samza.task.SystemTimerScheduler; +import org.apache.samza.task.EpochTimeScheduler; import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TimerCallback; +import org.apache.samza.scheduler.ScheduledCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext { private final JobModel jobModel; private final StreamMetadataCache streamMetadataCache; private final Map objectRegistry = new HashMap<>(); - private final SystemTimerScheduler timerScheduler; + private final EpochTimeScheduler timerScheduler; private Object userContext = null; @@ -76,7 +76,7 @@ public TaskContextImpl(TaskName taskName, this.tableManager = tableManager; this.jobModel = jobModel; this.streamMetadataCache = streamMetadataCache; - this.timerScheduler = SystemTimerScheduler.create(timerExecutor); + this.timerScheduler = EpochTimeScheduler.create(timerExecutor); } @Override @@ -134,12 +134,12 @@ public Object getUserContext() { } @Override - public void registerTimer(K key, long timestamp, TimerCallback callback) { + public void scheduleCallback(K key, long timestamp, ScheduledCallback callback) { timerScheduler.setTimer(key, timestamp, callback); } @Override - public void deleteTimer(K key) { + public void deleteScheduledCallback(K key) { timerScheduler.deleteTimer(key); } @@ -159,7 +159,7 @@ public StreamMetadataCache getStreamMetadataCache() { return streamMetadataCache; } - public SystemTimerScheduler getTimerScheduler() { + public EpochTimeScheduler getTimerScheduler() { return timerScheduler; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index f0c099794a..64db79150a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -25,8 +25,8 @@ import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.Scheduler; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.metrics.Counter; @@ -436,19 +436,19 @@ final long getOutputWatermark() { /** * Returns a registry which allows registering arbitrary system-clock timer with K-typed key. - * The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)} + * The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)} * for timer notifications. * @param key type for the timer. - * @return an instance of {@link TimerRegistry} + * @return an instance of {@link Scheduler} */ - TimerRegistry createOperatorTimerRegistry() { - return new TimerRegistry() { + Scheduler createOperatorScheduler() { + return new Scheduler() { @Override - public void register(K key, long time) { - taskContext.registerTimer(key, time, (k, collector, coordinator) -> { - final TimerFunction timerFn = getOperatorSpec().getTimerFn(); - if (timerFn != null) { - final Collection output = timerFn.onTimer(key, time); + public void schedule(K key, long time) { + taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> { + final ScheduledFunction scheduledFn = getOperatorSpec().getScheduledFn(); + if (scheduledFn != null) { + final Collection output = scheduledFn.onCallback(key, time); if (!output.isEmpty()) { output.forEach(rm -> @@ -457,7 +457,7 @@ public void register(K key, long time) { } } else { throw new SamzaException( - String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.", + String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.", getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation())); } }); @@ -465,7 +465,7 @@ public void register(K key, long time) { @Override public void delete(K key) { - taskContext.deleteTimer(key); + taskContext.deleteScheduledCallback(key); } }; } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 7f62e00742..6c78ae4e51 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -26,7 +26,7 @@ import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.KV; -import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.util.TimestampedValue; @@ -170,9 +170,9 @@ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec operatorImpl.init(config, context); operatorImpl.registerInputStream(inputStream); - if (operatorSpec.getTimerFn() != null) { - final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry(); - operatorSpec.getTimerFn().registerTimer(timerRegistry); + if (operatorSpec.getScheduledFn() != null) { + final Scheduler scheduler = operatorImpl.createOperatorScheduler(); + operatorSpec.getScheduledFn().schedule(scheduler); } // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl). diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 82dc0bf2b7..b175671e9a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -183,7 +183,7 @@ public Collection> handleMessage( @Override public Collection> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { - LOG.trace("Processing timer."); + LOG.trace("Processing time triggers"); List> results = new ArrayList<>(); List> keys = triggerScheduler.runPendingCallbacks(); diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java index 2c76e600a2..fb6515aa76 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; public class BroadcastOperatorSpec extends OperatorSpec { @@ -43,7 +43,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java index a5cdb82e5e..4e640dcf0f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.task.TaskContext; @@ -68,7 +68,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.filterFn instanceof TimerFunction ? (TimerFunction) this.filterFn : null; + public ScheduledFunction getScheduledFn() { + return this.filterFn instanceof ScheduledFunction ? (ScheduledFunction) this.filterFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java index a93a2215f5..160f432da1 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -41,7 +41,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.transformFn instanceof TimerFunction ? (TimerFunction) this.transformFn : null; + public ScheduledFunction getScheduledFn() { + return this.transformFn instanceof ScheduledFunction ? (ScheduledFunction) this.transformFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index c49443de70..1af4806dd7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.InputTransformer; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; @@ -99,7 +99,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 1b55784687..bb6ed59c5c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -20,7 +20,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.util.TimestampedValue; @@ -105,8 +105,8 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + public ScheduledFunction getScheduledFn() { + return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null; } public OperatorSpec getLeftInputOpSpec() { diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java index 1e2190bd27..6ce522f687 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.task.TaskContext; @@ -71,7 +71,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : null; + public ScheduledFunction getScheduledFn() { + return this.mapFn instanceof ScheduledFunction ? (ScheduledFunction) this.mapFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java index 987f72cfca..3685c5f02a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import java.util.ArrayList; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -45,7 +45,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 1e021f564c..0442f7c8b7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -25,7 +25,7 @@ import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -143,5 +143,5 @@ public final String getSourceLocation() { abstract public WatermarkFunction getWatermarkFn(); - abstract public TimerFunction getTimerFn(); + abstract public ScheduledFunction getScheduledFn(); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index 40a5c0ec06..d6238b8fcf 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -59,7 +59,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index d6bf3d94a5..069c8673be 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@ -20,7 +20,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import static com.google.common.base.Preconditions.checkArgument; @@ -55,10 +55,10 @@ public class PartitionByOperatorSpec extends OperatorSpec { MapFunction keyFunction, MapFunction valueFunction, String opId) { super(OpCode.PARTITION_BY, opId); - checkArgument(!(keyFunction instanceof TimerFunction || keyFunction instanceof WatermarkFunction), - "keyFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); - checkArgument(!(valueFunction instanceof TimerFunction || valueFunction instanceof WatermarkFunction), - "valueFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); + checkArgument(!(keyFunction instanceof ScheduledFunction || keyFunction instanceof WatermarkFunction), + "keyFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction."); + checkArgument(!(valueFunction instanceof ScheduledFunction || valueFunction instanceof WatermarkFunction), + "valueFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction."); this.outputStream = outputStream; this.keyFunction = keyFunction; this.valueFunction = valueFunction; @@ -86,7 +86,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java index 22f393ece6..bf032a2f46 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java @@ -20,7 +20,7 @@ import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.KV; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -58,7 +58,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index aa0f066947..91e2775eb4 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -57,7 +57,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null; + public ScheduledFunction getScheduledFn() { + return sinkFn instanceof ScheduledFunction ? (ScheduledFunction) sinkFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java index c7735c6d77..1849c640a6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java @@ -19,8 +19,8 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -66,8 +66,8 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + public ScheduledFunction getScheduledFn() { + return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 8d1ad2989d..ede16a5594 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; import org.apache.samza.operators.triggers.AnyTrigger; @@ -64,14 +64,14 @@ public class WindowOperatorSpec extends OperatorSpec window, String opId) { super(OpCode.WINDOW, opId); checkArgument(window.getInitializer() == null || - !(window.getInitializer() instanceof TimerFunction || window.getInitializer() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the initializer."); + !(window.getInitializer() instanceof ScheduledFunction || window.getInitializer() instanceof WatermarkFunction), + "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the initializer."); checkArgument(window.getKeyExtractor() == null || - !(window.getKeyExtractor() instanceof TimerFunction || window.getKeyExtractor() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the keyExtractor."); + !(window.getKeyExtractor() instanceof ScheduledFunction || window.getKeyExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the keyExtractor."); checkArgument(window.getEventTimeExtractor() == null || - !(window.getEventTimeExtractor() instanceof TimerFunction || window.getEventTimeExtractor() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the eventTimeExtractor."); + !(window.getEventTimeExtractor() instanceof ScheduledFunction || window.getEventTimeExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the eventTimeExtractor."); this.window = window; } @@ -88,21 +88,21 @@ public WindowInternal getWindow() { * @return the default triggering interval */ public long getDefaultTriggerMs() { - List timerTriggers = new ArrayList<>(); + List timeBasedTriggers = new ArrayList<>(); if (window.getDefaultTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger())); } if (window.getEarlyTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger())); } if (window.getLateTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger())); } - LOG.info("Got {} timer triggers", timerTriggers.size()); + LOG.info("Got {} time-based triggers", timeBasedTriggers.size()); - List candidateDurations = timerTriggers.stream() + List candidateDurations = timeBasedTriggers.stream() .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis()) .collect(Collectors.toList()); @@ -135,9 +135,9 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { FoldLeftFunction fn = window.getFoldLeftFunction(); - return fn instanceof TimerFunction ? (TimerFunction) fn : null; + return fn instanceof ScheduledFunction ? (ScheduledFunction) fn : null; } @Override diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 3b3e008460..111869c2ec 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -330,7 +330,7 @@ private enum WorkerOp { COMMIT, PROCESS, END_OF_STREAM, - TIMER, + SCHEDULER, NO_OP } @@ -374,10 +374,10 @@ public void run() { }, commitMs, commitMs, TimeUnit.MILLISECONDS); } - final SystemTimerScheduler timerFactory = task.context().getTimerScheduler(); - if (timerFactory != null) { - timerFactory.registerListener(() -> { - state.needTimer(); + final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler(); + if (epochTimeScheduler != null) { + epochTimeScheduler.registerListener(() -> { + state.needScheduler(); }); } } @@ -409,8 +409,8 @@ private void run() { case WINDOW: window(); break; - case TIMER: - timer(); + case SCHEDULER: + scheduler(); break; case COMMIT: commit(); @@ -551,8 +551,8 @@ public void run() { } } - private void timer() { - state.startTimer(); + private void scheduler() { + state.startScheduler(); Runnable timerWorker = new Runnable() { @Override public void run() { @@ -560,26 +560,26 @@ public void run() { ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); long startTime = clock.nanoTime(); - task.timer(coordinator); + task.scheduler(coordinator); containerMetrics.timerNs().update(clock.nanoTime() - startTime); coordinatorRequests.update(coordinator); - state.doneTimer(); + state.doneScheduler(); } catch (Throwable t) { - log.error("Task {} timer failed", task.taskName(), t); + log.error("Task {} scheduler failed", task.taskName(), t); abort(t); } finally { - log.trace("Task {} timer completed", task.taskName()); + log.trace("Task {} scheduler completed", task.taskName()); resume(); } } }; if (threadPool != null) { - log.trace("Task {} timer runs on the thread pool", task.taskName()); + log.trace("Task {} scheduler runs on the thread pool", task.taskName()); threadPool.submit(timerWorker); } else { - log.trace("Task {} timer runs on the run loop thread", task.taskName()); + log.trace("Task {} scheduler runs on the run loop thread", task.taskName()); timerWorker.run(); } } @@ -655,12 +655,12 @@ public void onFailure(TaskCallback callback, Throwable t) { private final class AsyncTaskState { private volatile boolean needWindow = false; private volatile boolean needCommit = false; - private volatile boolean needTimer = false; + private volatile boolean needScheduler = false; private volatile boolean complete = false; private volatile boolean endOfStream = false; private volatile boolean windowInFlight = false; private volatile boolean commitInFlight = false; - private volatile boolean timerInFlight = false; + private volatile boolean schedulerInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); private final ArrayDeque pendingEnvelopeQueue; @@ -706,28 +706,28 @@ private boolean isReady() { needCommit = true; } - boolean opInFlight = windowInFlight || commitInFlight || timerInFlight; + boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight; /* * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread * and either of the following conditions are true. - * a) When process, window, commit and timer are not in progress. + * a) When process, window, commit and scheduler are not in progress. * b) When task.async.commit is true and window, commit are not in progress. */ if (needCommit) { return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight; - } else if (needWindow || needTimer || endOfStream) { + } else if (needWindow || needScheduler || endOfStream) { /* - * A task is ready for window, timer or end-of-stream operation. + * A task is ready for window, scheduler or end-of-stream operation. */ return messagesInFlight.get() == 0 && !opInFlight; } else { /* * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency * and either of the following conditions are true. - * a) When window, commit and timer are not in progress. - * b) When task.async.commit is true and window and timer are not in progress. + * a) When window, commit and scheduler are not in progress. + * b) When task.async.commit is true and window and scheduler are not in progress. */ - return messagesInFlight.get() < maxConcurrency && !windowInFlight && !timerInFlight && (isAsyncCommitEnabled || !commitInFlight); + return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight); } } @@ -741,7 +741,7 @@ private WorkerOp nextOp() { if (isReady()) { if (needCommit) return WorkerOp.COMMIT; else if (needWindow) return WorkerOp.WINDOW; - else if (needTimer) return WorkerOp.TIMER; + else if (needScheduler) return WorkerOp.SCHEDULER; else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM; else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS; } @@ -756,8 +756,8 @@ private void needCommit() { needCommit = true; } - private void needTimer() { - needTimer = true; + private void needScheduler() { + needScheduler = true; } private void startWindow() { @@ -775,9 +775,9 @@ private void startProcess() { taskMetrics.messagesInFlight().set(count); } - private void startTimer() { - needTimer = false; - timerInFlight = true; + private void startScheduler() { + needScheduler = false; + schedulerInFlight = true; } private void doneCommit() { @@ -793,8 +793,8 @@ private void doneProcess() { taskMetrics.messagesInFlight().set(count); } - private void doneTimer() { - timerInFlight = false; + private void doneScheduler() { + schedulerInFlight = false; } /** diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java similarity index 86% rename from samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java rename to samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java index aa9792bdf2..3f50d9a477 100644 --- a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java +++ b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.samza.scheduler.ScheduledCallback; import static com.google.common.base.Preconditions.checkState; @@ -35,7 +36,7 @@ * 2) keeps track of the timers created and timers that are ready. * 3) triggers listener whenever a timer fires. */ -public class SystemTimerScheduler { +public class EpochTimeScheduler { /** * For run loop to listen to timer firing so it can schedule the callbacks. @@ -46,18 +47,18 @@ public interface TimerListener { private final ScheduledExecutorService executor; private final Map scheduledFutures = new ConcurrentHashMap<>(); - private final Map, TimerCallback> readyTimers = new ConcurrentHashMap<>(); + private final Map, ScheduledCallback> readyTimers = new ConcurrentHashMap<>(); private TimerListener timerListener; - public static SystemTimerScheduler create(ScheduledExecutorService executor) { - return new SystemTimerScheduler(executor); + public static EpochTimeScheduler create(ScheduledExecutorService executor) { + return new EpochTimeScheduler(executor); } - private SystemTimerScheduler(ScheduledExecutorService executor) { + private EpochTimeScheduler(ScheduledExecutorService executor) { this.executor = executor; } - public void setTimer(K key, long timestamp, TimerCallback callback) { + public void setTimer(K key, long timestamp, ScheduledCallback callback) { checkState(!scheduledFutures.containsKey(key), String.format("Duplicate key %s registration for the same timer", key)); @@ -84,8 +85,8 @@ void registerListener(TimerListener listener) { timerListener = listener; } - public Map, TimerCallback> removeReadyTimers() { - final Map, TimerCallback> timers = new TreeMap<>(readyTimers); + public Map, ScheduledCallback> removeReadyTimers() { + final Map, ScheduledCallback> timers = new TreeMap<>(readyTimers); readyTimers.keySet().removeAll(timers.keySet()); return timers; } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index d85f10f272..9f4fd1716f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -28,6 +28,7 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter +import org.apache.samza.scheduler.ScheduledCallback import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager} import org.apache.samza.system._ @@ -221,12 +222,12 @@ class TaskInstance( } } - def timer(coordinator: ReadableCoordinator) { - trace("Timer for taskName: %s" format taskName) + def scheduler(coordinator: ReadableCoordinator) { + trace("Scheduler for taskName: %s" format taskName) exceptionHandler.maybeHandle { context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry => - entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, collector, coordinator) + entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator) } } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java index a5b15b8b1e..71f4b733b5 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -28,7 +28,7 @@ import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -177,7 +177,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 249ff09f74..6d12d99497 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -28,7 +28,7 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.metrics.Timer; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; @@ -220,7 +220,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java index 7704a5b034..b9a27677cd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.TableImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.table.TableSpec; @@ -72,7 +72,7 @@ private static void assertClonedOpSpec(OperatorSpec oOpSpec, OperatorSpec nOpSpe assertNotEquals(oOpSpec, nOpSpec); assertEquals(oOpSpec.getOpId(), nOpSpec.getOpId()); assertEquals(oOpSpec.getOpCode(), nOpSpec.getOpCode()); - assertTimerFnsNotEqual(oOpSpec.getTimerFn(), nOpSpec.getTimerFn()); + assertScheduledFnsNotEqual(oOpSpec.getScheduledFn(), nOpSpec.getScheduledFn()); assertWatermarkFnNotEqual(nOpSpec.getWatermarkFn(), nOpSpec.getWatermarkFn()); assertAllOperators(oOpSpec.getRegisteredOperatorSpecs(), nOpSpec.getRegisteredOperatorSpecs()); } @@ -84,11 +84,11 @@ private static void assertWatermarkFnNotEqual(WatermarkFunction watermarkFn, Wat assertNotEquals(watermarkFn, watermarkFn1); } - private static void assertTimerFnsNotEqual(TimerFunction timerFn, TimerFunction timerFn1) { - if (timerFn == timerFn1 && timerFn == null) { + private static void assertScheduledFnsNotEqual(ScheduledFunction scheduledFn, ScheduledFunction scheduledFn1) { + if (scheduledFn == scheduledFn1 && scheduledFn == null) { return; } - assertNotEquals(timerFn, timerFn1); + assertNotEquals(scheduledFn, scheduledFn1); } private static void assertClonedTables(Map originalTables, Map clonedTables) { diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java index a9ccd12ad6..860e630314 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; -import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; @@ -33,7 +33,7 @@ import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -74,7 +74,8 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { } } - private static class MapWithTimerFn implements MapFunction, TimerFunction { + private static class MapWithScheduledFn implements MapFunction, + ScheduledFunction { @Override public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { @@ -82,12 +83,12 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(String key, long timestamp) { + public Collection onCallback(String key, long timestamp) { return null; } } @@ -164,8 +165,8 @@ public void testStreamOperatorSpecWithFlatMap() { assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getScheduledFn()); + assertNull(cloneOperatorSpec.getScheduledFn()); } @Test @@ -187,8 +188,8 @@ public void testStreamOperatorSpecWithMap() { assertNotEquals(userFn, clonedUserFn); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getScheduledFn()); + assertNull(cloneOperatorSpec.getScheduledFn()); } @Test @@ -209,8 +210,8 @@ public void testStreamOperatorSpecWithFilter() { assertNotEquals(userFn, clonedUserFn); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getScheduledFn()); + assertNull(cloneOperatorSpec.getScheduledFn()); } @Test @@ -360,13 +361,13 @@ public void testMapStreamOperatorSpecWithWatermark() { assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn); assertNotNull(cloneOperatorSpec.getWatermarkFn()); assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getScheduledFn()); + assertNull(cloneOperatorSpec.getScheduledFn()); } @Test - public void testMapStreamOperatorSpecWithTimer() { - MapWithTimerFn testMapFn = new MapWithTimerFn(); + public void testMapStreamOperatorSpecWithScheduledFunction() { + MapWithScheduledFn testMapFn = new MapWithScheduledFn(); StreamOperatorSpec streamOperatorSpec = OperatorSpecs.createMapOperatorSpec(testMapFn, "op0"); @@ -378,9 +379,9 @@ public void testMapStreamOperatorSpecWithTimer() { assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn()); - assertEquals(streamOperatorSpec.getTimerFn(), testMapFn); - assertNotNull(cloneOperatorSpec.getTimerFn()); - assertNotEquals(streamOperatorSpec.getTimerFn(), cloneOperatorSpec.getTimerFn()); + assertEquals(streamOperatorSpec.getScheduledFn(), testMapFn); + assertNotNull(cloneOperatorSpec.getScheduledFn()); + assertNotEquals(streamOperatorSpec.getScheduledFn(), cloneOperatorSpec.getScheduledFn()); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index db7079c8b3..1d98580654 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -23,13 +23,13 @@ import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; @@ -58,7 +58,7 @@ public class TestPartitionByOperatorSpec { private final String testJobId = "1"; private final String testRepartitionedStreamName = "parByKey"; - class TimerMapFn implements MapFunction, TimerFunction { + class ScheduledMapFn implements MapFunction, ScheduledFunction { @Override public String apply(Object message) { @@ -66,12 +66,12 @@ public String apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(String key, long timestamp) { + public Collection onCallback(String key, long timestamp) { return null; } } @@ -117,7 +117,7 @@ public void testPartitionBy() { assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); assertTrue(inputOpSpec.isKeyed()); - assertNull(inputOpSpec.getTimerFn()); + assertNull(inputOpSpec.getScheduledFn()); assertNull(inputOpSpec.getWatermarkFn()); InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); @@ -126,7 +126,7 @@ public void testPartitionBy() { assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); - assertNull(reparOpSpec.getTimerFn()); + assertNull(reparOpSpec.getScheduledFn()); assertNull(reparOpSpec.getWatermarkFn()); } @@ -144,7 +144,7 @@ public void testPartitionByWithNoSerde() { assertNull(inputOpSpec.getKeySerde()); assertNull(inputOpSpec.getValueSerde()); assertTrue(inputOpSpec.isKeyed()); - assertNull(inputOpSpec.getTimerFn()); + assertNull(inputOpSpec.getScheduledFn()); assertNull(inputOpSpec.getWatermarkFn()); InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); @@ -153,7 +153,7 @@ public void testPartitionByWithNoSerde() { assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); - assertNull(reparOpSpec.getTimerFn()); + assertNull(reparOpSpec.getScheduledFn()); assertNull(reparOpSpec.getWatermarkFn()); } @@ -169,8 +169,8 @@ public void testCopy() { } @Test(expected = IllegalArgumentException.class) - public void testTimerFunctionAsKeyFn() { - TimerMapFn keyFn = new TimerMapFn(); + public void testScheduledFunctionAsKeyFn() { + ScheduledMapFn keyFn = new ScheduledMapFn(); new StreamApplicationDescriptorImpl(appDesc -> { MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); inputStream.partitionBy(keyFn, m -> m, "parByKey"); @@ -187,8 +187,8 @@ public void testWatermarkFunctionAsKeyFn() { } @Test(expected = IllegalArgumentException.class) - public void testTimerFunctionAsValueFn() { - TimerMapFn valueFn = new TimerMapFn(); + public void testScheduledFunctionAsValueFn() { + ScheduledMapFn valueFn = new ScheduledMapFn(); new StreamApplicationDescriptorImpl(appDesc -> { MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index 0a2214bfcf..41973b2294 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -19,8 +19,8 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.Scheduler; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.FoldLeftFunction; @@ -90,8 +90,8 @@ public void testTriggerIntervalWithSingleTimeTrigger() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsInitializer() { - class TimedSupplierFunction implements SupplierFunction, TimerFunction { + public void testIllegalScheduledFunctionAsInitializer() { + class TimedSupplierFunction implements SupplierFunction, ScheduledFunction { @Override public Collection get() { @@ -99,12 +99,12 @@ public Collection get() { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection onCallback(Object key, long timestamp) { return null; } } @@ -138,8 +138,8 @@ public Long getOutputWatermark() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsKeyFn() { - class TimerMapFunction implements MapFunction, TimerFunction { + public void testIllegalScheduledFunctionAsKeyFn() { + class ScheduledMapFunction implements MapFunction, ScheduledFunction { @Override public Object apply(Object message) { @@ -147,16 +147,16 @@ public Object apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection onCallback(Object key, long timestamp) { return null; } } - keyFn = new TimerMapFunction(); + keyFn = new ScheduledMapFunction(); getWindowOperatorSpec("w0"); } @@ -186,8 +186,8 @@ public Long getOutputWatermark() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsEventTimeFn() { - class TimerMapFunction implements MapFunction, TimerFunction { + public void testIllegalScheduledFunctionAsEventTimeFn() { + class ScheduledMapFunction implements MapFunction, ScheduledFunction { @Override public Long apply(Object message) { @@ -195,16 +195,16 @@ public Long apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection onCallback(Object key, long timestamp) { return null; } } - timeFn = new TimerMapFunction(); + timeFn = new ScheduledMapFunction(); getWindowOperatorSpec("w0"); } @@ -234,8 +234,9 @@ public Long getOutputWatermark() { } @Test - public void testTimerFunctionAsFoldLeftFn() { - class TimerFoldLeftFunction implements FoldLeftFunction, TimerFunction { + public void testScheduledFunctionAsFoldLeftFn() { + class ScheduledFoldLeftFunction + implements FoldLeftFunction, ScheduledFunction { @Override public Collection apply(Object message, Collection oldValue) { @@ -244,19 +245,19 @@ public Collection apply(Object message, Collection oldValue) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedule(Scheduler scheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection onCallback(Object key, long timestamp) { return null; } } - foldFn = new TimerFoldLeftFunction(); + foldFn = new ScheduledFoldLeftFunction(); WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0"); - assertEquals(windowSpec.getTimerFn(), foldFn); + assertEquals(windowSpec.getScheduledFn(), foldFn); assertNull(windowSpec.getWatermarkFn()); } @@ -284,7 +285,7 @@ public Collection apply(Object message, Collection oldValue) { foldFn = new WatermarkFoldLeftFunction(); WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0"); assertEquals(windowSpec.getWatermarkFn(), foldFn); - assertNull(windowSpec.getTimerFn()); + assertNull(windowSpec.getScheduledFn()); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java similarity index 87% rename from samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java rename to samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java index dd08121728..e0da2e95b9 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class TestSystemTimerScheduler { +public class TestEpochTimeScheduler { private ScheduledExecutorService createExecutorService() { ScheduledExecutorService service = mock(ScheduledExecutorService.class); @@ -49,15 +49,15 @@ private ScheduledExecutorService createExecutorService() { return service; } - private void fireTimers(SystemTimerScheduler factory) { + private void fireTimers(EpochTimeScheduler factory) { factory.removeReadyTimers().entrySet().forEach(entry -> { - entry.getValue().onTimer(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); + entry.getValue().onCallback(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); }); } @Test public void testSingleTimer() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> { results.add(key); @@ -71,7 +71,7 @@ public void testSingleTimer() { @Test public void testMultipleTimers() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> { results.add(key + ":3"); @@ -97,7 +97,7 @@ public void testMultipleKeys() { Object key2 = new Object(); List results = new ArrayList<>(); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); scheduler.setTimer(key1, 2, (key, collector, coordinator) -> { assertEquals(key, key1); results.add("key1:2"); @@ -120,7 +120,7 @@ public void testMultipleKeyTypes() { Long key2 = Long.MAX_VALUE; List results = new ArrayList<>(); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); scheduler.setTimer(key1, 1, (key, collector, coordinator) -> { assertEquals(key, key1); results.add("key:1"); @@ -144,7 +144,7 @@ public void testRemoveTimer() { when(future.cancel(anyBoolean())).thenReturn(true); when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(service); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(service); List results = new ArrayList<>(); scheduler.setTimer("timer", 1, (key, collector, coordinator) -> { results.add(key); @@ -160,7 +160,7 @@ public void testRemoveTimer() { @Test public void testTimerListener() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.registerListener(() -> { results.add("timer-listener"); diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java index 1acfc4795c..3046c1fa49 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -39,7 +39,7 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; @@ -192,7 +192,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } }; diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java similarity index 91% rename from samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java rename to samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java index d4e0e14f02..658492a7a8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -26,9 +26,9 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.samza.test.framework.TestTimerApp.*; +import static org.apache.samza.test.framework.TestSchedulingApp.*; -public class TimerTest extends StreamApplicationIntegrationTestHarness { +public class SchedulingTest extends StreamApplicationIntegrationTestHarness { @Before public void setup() { @@ -55,6 +55,6 @@ public void testJob() throws InterruptedException { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); - runApplication(new TestTimerApp(), "TimerTest", configs); + runApplication(new TestSchedulingApp(), "SchedulingTest", configs); } } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java similarity index 79% rename from samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java rename to samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java index e72a965d26..db78e8cb21 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java @@ -26,16 +26,16 @@ import java.util.List; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.Scheduler; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -public class TestTimerApp implements StreamApplication { +public class TestSchedulingApp implements StreamApplication { public static final String PAGE_VIEWS = "page-views"; @Override @@ -44,9 +44,9 @@ public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); KafkaInputDescriptor isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); final MessageStream pageViews = appDesc.getInputStream(isd); - final MessageStream output = pageViews.flatMap(new FlatmapTimerFn()); + final MessageStream output = pageViews.flatMap(new FlatmapScheduledFn()); - MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) + MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde) .containsInAnyOrder( Arrays.asList( new PageView("v1-complete", "p1", "u1"), @@ -56,14 +56,15 @@ public void describe(StreamApplicationDescriptor appDesc) { )); } - private static class FlatmapTimerFn implements FlatMapFunction, TimerFunction { + private static class FlatmapScheduledFn + implements FlatMapFunction, ScheduledFunction { private transient List pageViews; - private transient TimerRegistry timerRegistry; + private transient Scheduler scheduler; @Override - public void registerTimer(TimerRegistry timerRegistry) { - this.timerRegistry = timerRegistry; + public void schedule(Scheduler scheduler) { + this.scheduler = scheduler; this.pageViews = new ArrayList<>(); } @@ -75,13 +76,13 @@ public Collection apply(PageView message) { if (pageViews.size() == 2) { //got all messages for this task final long time = System.currentTimeMillis() + 100; - timerRegistry.register("CompleteTimer", time); + scheduler.schedule("CompleteScheduler", time); } return Collections.emptyList(); } @Override - public Collection onTimer(String key, long time) { + public Collection onCallback(String key, long time) { return pageViews; } }