From 67c3608d00fb70aa7124c0c671ba281e4dbc5a57 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Mon, 17 Sep 2018 17:25:18 -0700 Subject: [PATCH 1/4] SAMZA-1880: Rename non-metrics classes which use Timer in their name --- .../{TimerRegistry.java => KeyScheduler.java} | 22 +++--- .../functions/SchedulingFunction.java | 67 +++++++++++++++++++ .../operators/functions/TimerFunction.java | 65 ------------------ .../SchedulingCallback.java} | 19 ++++-- .../org/apache/samza/task/TaskContext.java | 19 +++--- .../samza/container/TaskContextImpl.java | 6 +- .../samza/operators/impl/OperatorImpl.java | 26 +++---- .../operators/impl/OperatorImplGraph.java | 8 +-- .../operators/impl/WindowOperatorImpl.java | 2 +- .../operators/spec/BroadcastOperatorSpec.java | 4 +- .../operators/spec/FilterOperatorSpec.java | 6 +- .../operators/spec/FlatMapOperatorSpec.java | 6 +- .../operators/spec/InputOperatorSpec.java | 4 +- .../operators/spec/JoinOperatorSpec.java | 6 +- .../samza/operators/spec/MapOperatorSpec.java | 6 +- .../operators/spec/MergeOperatorSpec.java | 4 +- .../samza/operators/spec/OperatorSpec.java | 4 +- .../operators/spec/OutputOperatorSpec.java | 4 +- .../spec/PartitionByOperatorSpec.java | 12 ++-- .../spec/SendToTableOperatorSpec.java | 4 +- .../operators/spec/SinkOperatorSpec.java | 6 +- .../spec/StreamTableJoinOperatorSpec.java | 6 +- .../operators/spec/WindowOperatorSpec.java | 30 ++++----- .../org/apache/samza/task/AsyncRunLoop.java | 64 +++++++++--------- .../samza/task/SystemTimerScheduler.java | 9 +-- .../apache/samza/container/TaskInstance.scala | 7 +- .../operators/TestOperatorSpecGraph.java | 4 +- .../operators/impl/TestOperatorImpl.java | 4 +- .../operators/spec/OperatorSpecTestUtils.java | 10 +-- .../operators/spec/TestOperatorSpec.java | 37 +++++----- .../spec/TestPartitionByOperatorSpec.java | 26 +++---- .../spec/TestWindowOperatorSpec.java | 47 ++++++------- .../samza/task/TestSystemTimerScheduler.java | 2 +- .../sql/translator/TestProjectTranslator.java | 4 +- .../{TimerTest.java => SchedulingTest.java} | 6 +- ...stTimerApp.java => TestSchedulingApp.java} | 23 ++++--- 36 files changed, 297 insertions(+), 282 deletions(-) rename samza-api/src/main/java/org/apache/samza/operators/{TimerRegistry.java => KeyScheduler.java} (63%) create mode 100644 samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java delete mode 100644 samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java rename samza-api/src/main/java/org/apache/samza/{task/TimerCallback.java => scheduler/SchedulingCallback.java} (66%) rename samza-test/src/test/java/org/apache/samza/test/framework/{TimerTest.java => SchedulingTest.java} (91%) rename samza-test/src/test/java/org/apache/samza/test/framework/{TestTimerApp.java => TestSchedulingApp.java} (78%) diff --git a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java b/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java similarity index 63% rename from samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java rename to samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java index 64dd4ec96d..22e87d5e6d 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java @@ -19,23 +19,25 @@ package org.apache.samza.operators; +import org.apache.samza.operators.functions.SchedulingFunction; + + /** - * Allows registering epoch-time timer callbacks from the operators. - * See {@link org.apache.samza.operators.functions.TimerFunction} for details. - * @param type of the timer key + * Manages scheduling keys to be triggered later. See {@link SchedulingFunction} for details. + * @param type of the key */ -public interface TimerRegistry { +public interface KeyScheduler { /** - * Register a epoch-time timer with key. - * @param key unique timer key - * @param timestamp epoch time when the timer will be fired, in milliseconds + * Schedule a key to be triggered at {@code timestamp}. + * @param key unique key + * @param timestamp epoch time when the key will be triggered, in milliseconds */ - void register(K key, long timestamp); + void schedule(K key, long timestamp); /** - * Delete the timer for the provided key. - * @param key key for the timer to delete + * Delete the scheduler entry for the provided key. + * @param key key to delete */ void delete(K key); } diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java new file mode 100644 index 0000000000..21cb0895b7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.functions; + +import org.apache.samza.operators.KeyScheduler; + +import java.util.Collection; + +/** + * Allows scheduling with key(s) and is invoked when the specified time(s) occurs. + * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the + * corresponding schedule time occurs. + * + *

+ * Example of a {@link FlatMapFunction} with {@link SchedulingFunction}: + *

{@code
+ *    public class ExampleSchedulingFn implements FlatMapFunction, SchedulingFunction {
+ *      public void schedulingInit(KeyScheduler keyScheduler) {
+ *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
+ *        keyScheduler.schedule("example-scheduler-logic", time);
+ *      }
+ *      public Collection apply(String s) {
+ *        ...
+ *      }
+ *      public Collection executeForKey(String key, long timestamp) {
+ *        // fired with key as "example-scheduler-logic"
+ *        ...
+ *      }
+ *    }
+ * }
+ * @param type of the key + * @param type of the output + */ +public interface SchedulingFunction { + + /** + * Initialize the function for scheduling, such as setting some initial scheduling logic or saving the + * {@code keyScheduler} for later use. + * @param keyScheduler used to specify the schedule time(s) and key(s) + */ + void schedulingInit(KeyScheduler keyScheduler); + + /** + * Returns the output from the scheduling logic corresponding to the key that was triggered. + * @param key key corresponding to the scheduling logic that got triggered + * @param timestamp schedule time that was set for the key, in milliseconds since epoch + * @return {@link Collection} of output elements + */ + Collection executeForKey(K key, long timestamp); +} diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java deleted file mode 100644 index 01825c6479..0000000000 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.functions; - -import org.apache.samza.operators.TimerRegistry; - -import java.util.Collection; - -/** - * Allows timer registration with a key and is invoked when the timer is fired. - * Key must be a unique identifier for this timer, and is provided in the callback when the timer fires. - * - *

- * Example of a {@link FlatMapFunction} with timer: - *

{@code
- *    public class ExampleTimerFn implements FlatMapFunction, TimerFunction {
- *      public void registerTimer(TimerRegistry timerRegistry) {
- *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        timerRegistry.register("example-timer", time);
- *      }
- *      public Collection apply(String s) {
- *        ...
- *      }
- *      public Collection onTimer(String key, long timestamp) {
- *        // example-timer fired
- *        ...
- *      }
- *    }
- * }
- * @param type of the key - * @param type of the output - */ -public interface TimerFunction { - - /** - * Registers any epoch-time timers using the registry - * @param timerRegistry a keyed {@link TimerRegistry} - */ - void registerTimer(TimerRegistry timerRegistry); - - /** - * Returns the output after the timer with key fires. - * @param key timer key - * @param timestamp time of the epoch-time timer fired, in milliseconds - * @return {@link Collection} of output elements - */ - Collection onTimer(K key, long timestamp); -} diff --git a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java b/samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java similarity index 66% rename from samza-api/src/main/java/org/apache/samza/task/TimerCallback.java rename to samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java index 3add129c63..34a5341f0e 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java +++ b/samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java @@ -17,18 +17,23 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.scheduler; + +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + /** - * The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires. - * @param type of the timer key + * The callback that is invoked when its corresponding schedule time registered via + * {@link org.apache.samza.task.TaskContext} is reached. + * @param type of the callback key */ -public interface TimerCallback { +public interface SchedulingCallback { /** - * Invoked when the timer of key fires. - * @param key timer key + * Invoked when the corresponding schedule time is reached. + * @param key key for callback * @param collector contains the means of sending message envelopes to the output stream. * @param coordinator manages execution of tasks. */ - void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator); + void execute(K key, MessageCollector collector, TaskCoordinator coordinator); } diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index ea2a3bca98..32b9be3aa6 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -24,6 +24,7 @@ import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.scheduler.SchedulingCallback; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; @@ -76,21 +77,21 @@ default Object getUserContext() { } /** - * Register a keyed timer with a callback of {@link TimerCallback} in this task. + * Register a keyed callback in this task. * The callback will be invoked exclusively with any other operations for this task, * e.g. processing, windowing and commit. - * @param key timer key - * @param timestamp epoch time when the timer will be fired, in milliseconds - * @param callback callback when the timer is fired + * @param key key for the callback + * @param timestamp epoch time when the callback will be fired, in milliseconds + * @param callback callback to call when the {@code timestamp} is reached * @param type of the key */ - void registerTimer(K key, long timestamp, TimerCallback callback); + void scheduleCallback(K key, long timestamp, SchedulingCallback callback); /** - * Delete the keyed timer in this task. - * Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt. - * @param key timer key + * Delete the keyed callback in this task. + * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt. + * @param key callback key * @param type of the key */ - void deleteTimer(K key); + void deleteScheduledCallback(K key); } diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index d65be4cbbb..05f849099d 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -31,7 +31,7 @@ import org.apache.samza.table.TableManager; import org.apache.samza.task.SystemTimerScheduler; import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TimerCallback; +import org.apache.samza.scheduler.SchedulingCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,12 +134,12 @@ public Object getUserContext() { } @Override - public void registerTimer(K key, long timestamp, TimerCallback callback) { + public void scheduleCallback(K key, long timestamp, SchedulingCallback callback) { timerScheduler.setTimer(key, timestamp, callback); } @Override - public void deleteTimer(K key) { + public void deleteScheduledCallback(K key) { timerScheduler.deleteTimer(key); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index f0c099794a..851b48f012 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -25,8 +25,8 @@ import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.KeyScheduler; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.metrics.Counter; @@ -436,19 +436,19 @@ final long getOutputWatermark() { /** * Returns a registry which allows registering arbitrary system-clock timer with K-typed key. - * The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)} + * The user-defined function in the operator spec needs to implement {@link SchedulingFunction#executeForKey(Object, long)} * for timer notifications. * @param key type for the timer. - * @return an instance of {@link TimerRegistry} + * @return an instance of {@link KeyScheduler} */ - TimerRegistry createOperatorTimerRegistry() { - return new TimerRegistry() { + KeyScheduler createOperatorKeyScheduler() { + return new KeyScheduler() { @Override - public void register(K key, long time) { - taskContext.registerTimer(key, time, (k, collector, coordinator) -> { - final TimerFunction timerFn = getOperatorSpec().getTimerFn(); - if (timerFn != null) { - final Collection output = timerFn.onTimer(key, time); + public void schedule(K key, long time) { + taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> { + final SchedulingFunction schedulingFn = getOperatorSpec().getSchedulingFn(); + if (schedulingFn != null) { + final Collection output = schedulingFn.executeForKey(key, time); if (!output.isEmpty()) { output.forEach(rm -> @@ -457,7 +457,7 @@ public void register(K key, long time) { } } else { throw new SamzaException( - String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.", + String.format("Operator %s id %s (created at %s) must implement SchedulingFunction to use system timer.", getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation())); } }); @@ -465,7 +465,7 @@ public void register(K key, long time) { @Override public void delete(K key) { - taskContext.deleteTimer(key); + taskContext.deleteScheduledCallback(key); } }; } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 7f62e00742..13b1fc593e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -26,7 +26,7 @@ import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.KV; -import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.KeyScheduler; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.util.TimestampedValue; @@ -170,9 +170,9 @@ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec operatorImpl.init(config, context); operatorImpl.registerInputStream(inputStream); - if (operatorSpec.getTimerFn() != null) { - final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry(); - operatorSpec.getTimerFn().registerTimer(timerRegistry); + if (operatorSpec.getSchedulingFn() != null) { + final KeyScheduler keyScheduler = operatorImpl.createOperatorKeyScheduler(); + operatorSpec.getSchedulingFn().schedulingInit(keyScheduler); } // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl). diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 82dc0bf2b7..b175671e9a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -183,7 +183,7 @@ public Collection> handleMessage( @Override public Collection> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { - LOG.trace("Processing timer."); + LOG.trace("Processing time triggers"); List> results = new ArrayList<>(); List> keys = triggerScheduler.runPendingCallbacks(); diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java index 2c76e600a2..abc9bee0c7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; public class BroadcastOperatorSpec extends OperatorSpec { @@ -43,7 +43,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java index a5cdb82e5e..b26f914ee6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.task.TaskContext; @@ -68,7 +68,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.filterFn instanceof TimerFunction ? (TimerFunction) this.filterFn : null; + public SchedulingFunction getSchedulingFn() { + return this.filterFn instanceof SchedulingFunction ? (SchedulingFunction) this.filterFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java index a93a2215f5..d52dc0ff2a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -41,7 +41,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.transformFn instanceof TimerFunction ? (TimerFunction) this.transformFn : null; + public SchedulingFunction getSchedulingFn() { + return this.transformFn instanceof SchedulingFunction ? (SchedulingFunction) this.transformFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index c49443de70..afb8266994 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.InputTransformer; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; @@ -99,7 +99,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 1b55784687..5bb64e44ce 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -20,7 +20,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.util.TimestampedValue; @@ -105,8 +105,8 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + public SchedulingFunction getSchedulingFn() { + return joinFn instanceof SchedulingFunction ? (SchedulingFunction) joinFn : null; } public OperatorSpec getLeftInputOpSpec() { diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java index 1e2190bd27..36a55422fc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.task.TaskContext; @@ -71,7 +71,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : null; + public SchedulingFunction getSchedulingFn() { + return this.mapFn instanceof SchedulingFunction ? (SchedulingFunction) this.mapFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java index 987f72cfca..18dd15ea36 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import java.util.ArrayList; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -45,7 +45,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 1e021f564c..6cd16cf17c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -25,7 +25,7 @@ import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -143,5 +143,5 @@ public final String getSourceLocation() { abstract public WatermarkFunction getWatermarkFn(); - abstract public TimerFunction getTimerFn(); + abstract public SchedulingFunction getSchedulingFn(); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index 40a5c0ec06..3487b1a8f8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -59,7 +59,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index d6bf3d94a5..dee72e392e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@ -20,7 +20,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import static com.google.common.base.Preconditions.checkArgument; @@ -55,10 +55,10 @@ public class PartitionByOperatorSpec extends OperatorSpec { MapFunction keyFunction, MapFunction valueFunction, String opId) { super(OpCode.PARTITION_BY, opId); - checkArgument(!(keyFunction instanceof TimerFunction || keyFunction instanceof WatermarkFunction), - "keyFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); - checkArgument(!(valueFunction instanceof TimerFunction || valueFunction instanceof WatermarkFunction), - "valueFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); + checkArgument(!(keyFunction instanceof SchedulingFunction || keyFunction instanceof WatermarkFunction), + "keyFunction for partitionBy should not implement SchedulingFunction or WatermarkFunction."); + checkArgument(!(valueFunction instanceof SchedulingFunction || valueFunction instanceof WatermarkFunction), + "valueFunction for partitionBy should not implement SchedulingFunction or WatermarkFunction."); this.outputStream = outputStream; this.keyFunction = keyFunction; this.valueFunction = valueFunction; @@ -86,7 +86,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java index 22f393ece6..5a11a66ec0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java @@ -20,7 +20,7 @@ import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.KV; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -58,7 +58,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index aa0f066947..b27f8a8351 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -57,7 +57,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null; + public SchedulingFunction getSchedulingFn() { + return sinkFn instanceof SchedulingFunction ? (SchedulingFunction) sinkFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java index c7735c6d77..28018d32f7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java @@ -19,8 +19,8 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -66,8 +66,8 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { - return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + public SchedulingFunction getSchedulingFn() { + return joinFn instanceof SchedulingFunction ? (SchedulingFunction) joinFn : null; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 8d1ad2989d..f4a074a224 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; import org.apache.samza.operators.triggers.AnyTrigger; @@ -64,14 +64,14 @@ public class WindowOperatorSpec extends OperatorSpec window, String opId) { super(OpCode.WINDOW, opId); checkArgument(window.getInitializer() == null || - !(window.getInitializer() instanceof TimerFunction || window.getInitializer() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the initializer."); + !(window.getInitializer() instanceof SchedulingFunction || window.getInitializer() instanceof WatermarkFunction), + "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the initializer."); checkArgument(window.getKeyExtractor() == null || - !(window.getKeyExtractor() instanceof TimerFunction || window.getKeyExtractor() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the keyExtractor."); + !(window.getKeyExtractor() instanceof SchedulingFunction || window.getKeyExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the keyExtractor."); checkArgument(window.getEventTimeExtractor() == null || - !(window.getEventTimeExtractor() instanceof TimerFunction || window.getEventTimeExtractor() instanceof WatermarkFunction), - "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the eventTimeExtractor."); + !(window.getEventTimeExtractor() instanceof SchedulingFunction || window.getEventTimeExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the eventTimeExtractor."); this.window = window; } @@ -88,21 +88,21 @@ public WindowInternal getWindow() { * @return the default triggering interval */ public long getDefaultTriggerMs() { - List timerTriggers = new ArrayList<>(); + List timeBasedTriggers = new ArrayList<>(); if (window.getDefaultTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger())); } if (window.getEarlyTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger())); } if (window.getLateTrigger() != null) { - timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger())); + timeBasedTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger())); } - LOG.info("Got {} timer triggers", timerTriggers.size()); + LOG.info("Got {} time-based triggers", timeBasedTriggers.size()); - List candidateDurations = timerTriggers.stream() + List candidateDurations = timeBasedTriggers.stream() .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis()) .collect(Collectors.toList()); @@ -135,9 +135,9 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { FoldLeftFunction fn = window.getFoldLeftFunction(); - return fn instanceof TimerFunction ? (TimerFunction) fn : null; + return fn instanceof SchedulingFunction ? (SchedulingFunction) fn : null; } @Override diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 3b3e008460..dc98d204e5 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -330,7 +330,7 @@ private enum WorkerOp { COMMIT, PROCESS, END_OF_STREAM, - TIMER, + SCHEDULER, NO_OP } @@ -374,10 +374,10 @@ public void run() { }, commitMs, commitMs, TimeUnit.MILLISECONDS); } - final SystemTimerScheduler timerFactory = task.context().getTimerScheduler(); - if (timerFactory != null) { - timerFactory.registerListener(() -> { - state.needTimer(); + final SystemTimerScheduler timerScheduler = task.context().getTimerScheduler(); + if (timerScheduler != null) { + timerScheduler.registerListener(() -> { + state.needScheduler(); }); } } @@ -409,8 +409,8 @@ private void run() { case WINDOW: window(); break; - case TIMER: - timer(); + case SCHEDULER: + scheduler(); break; case COMMIT: commit(); @@ -551,8 +551,8 @@ public void run() { } } - private void timer() { - state.startTimer(); + private void scheduler() { + state.startScheduler(); Runnable timerWorker = new Runnable() { @Override public void run() { @@ -560,26 +560,26 @@ public void run() { ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); long startTime = clock.nanoTime(); - task.timer(coordinator); + task.scheduler(coordinator); containerMetrics.timerNs().update(clock.nanoTime() - startTime); coordinatorRequests.update(coordinator); - state.doneTimer(); + state.doneScheduler(); } catch (Throwable t) { - log.error("Task {} timer failed", task.taskName(), t); + log.error("Task {} scheduler failed", task.taskName(), t); abort(t); } finally { - log.trace("Task {} timer completed", task.taskName()); + log.trace("Task {} scheduler completed", task.taskName()); resume(); } } }; if (threadPool != null) { - log.trace("Task {} timer runs on the thread pool", task.taskName()); + log.trace("Task {} scheduler runs on the thread pool", task.taskName()); threadPool.submit(timerWorker); } else { - log.trace("Task {} timer runs on the run loop thread", task.taskName()); + log.trace("Task {} scheduler runs on the run loop thread", task.taskName()); timerWorker.run(); } } @@ -655,12 +655,12 @@ public void onFailure(TaskCallback callback, Throwable t) { private final class AsyncTaskState { private volatile boolean needWindow = false; private volatile boolean needCommit = false; - private volatile boolean needTimer = false; + private volatile boolean needScheduler = false; private volatile boolean complete = false; private volatile boolean endOfStream = false; private volatile boolean windowInFlight = false; private volatile boolean commitInFlight = false; - private volatile boolean timerInFlight = false; + private volatile boolean schedulerInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); private final ArrayDeque pendingEnvelopeQueue; @@ -706,28 +706,28 @@ private boolean isReady() { needCommit = true; } - boolean opInFlight = windowInFlight || commitInFlight || timerInFlight; + boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight; /* * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread * and either of the following conditions are true. - * a) When process, window, commit and timer are not in progress. + * a) When process, window, commit and scheduler are not in progress. * b) When task.async.commit is true and window, commit are not in progress. */ if (needCommit) { return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight; - } else if (needWindow || needTimer || endOfStream) { + } else if (needWindow || needScheduler || endOfStream) { /* - * A task is ready for window, timer or end-of-stream operation. + * A task is ready for window, scheduler or end-of-stream operation. */ return messagesInFlight.get() == 0 && !opInFlight; } else { /* * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency * and either of the following conditions are true. - * a) When window, commit and timer are not in progress. - * b) When task.async.commit is true and window and timer are not in progress. + * a) When window, commit and scheduler are not in progress. + * b) When task.async.commit is true and window and scheduler are not in progress. */ - return messagesInFlight.get() < maxConcurrency && !windowInFlight && !timerInFlight && (isAsyncCommitEnabled || !commitInFlight); + return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight); } } @@ -741,7 +741,7 @@ private WorkerOp nextOp() { if (isReady()) { if (needCommit) return WorkerOp.COMMIT; else if (needWindow) return WorkerOp.WINDOW; - else if (needTimer) return WorkerOp.TIMER; + else if (needScheduler) return WorkerOp.SCHEDULER; else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM; else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS; } @@ -756,8 +756,8 @@ private void needCommit() { needCommit = true; } - private void needTimer() { - needTimer = true; + private void needScheduler() { + needScheduler = true; } private void startWindow() { @@ -775,9 +775,9 @@ private void startProcess() { taskMetrics.messagesInFlight().set(count); } - private void startTimer() { - needTimer = false; - timerInFlight = true; + private void startScheduler() { + needScheduler = false; + schedulerInFlight = true; } private void doneCommit() { @@ -793,8 +793,8 @@ private void doneProcess() { taskMetrics.messagesInFlight().set(count); } - private void doneTimer() { - timerInFlight = false; + private void doneScheduler() { + schedulerInFlight = false; } /** diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java index aa9792bdf2..eba9e583e6 100644 --- a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java +++ b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.samza.scheduler.SchedulingCallback; import static com.google.common.base.Preconditions.checkState; @@ -46,7 +47,7 @@ public interface TimerListener { private final ScheduledExecutorService executor; private final Map scheduledFutures = new ConcurrentHashMap<>(); - private final Map, TimerCallback> readyTimers = new ConcurrentHashMap<>(); + private final Map, SchedulingCallback> readyTimers = new ConcurrentHashMap<>(); private TimerListener timerListener; public static SystemTimerScheduler create(ScheduledExecutorService executor) { @@ -57,7 +58,7 @@ private SystemTimerScheduler(ScheduledExecutorService executor) { this.executor = executor; } - public void setTimer(K key, long timestamp, TimerCallback callback) { + public void setTimer(K key, long timestamp, SchedulingCallback callback) { checkState(!scheduledFutures.containsKey(key), String.format("Duplicate key %s registration for the same timer", key)); @@ -84,8 +85,8 @@ void registerListener(TimerListener listener) { timerListener = listener; } - public Map, TimerCallback> removeReadyTimers() { - final Map, TimerCallback> timers = new TreeMap<>(readyTimers); + public Map, SchedulingCallback> removeReadyTimers() { + final Map, SchedulingCallback> timers = new TreeMap<>(readyTimers); readyTimers.keySet().removeAll(timers.keySet()); return timers; } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index d85f10f272..a3448fcece 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -28,6 +28,7 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter +import org.apache.samza.scheduler.SchedulingCallback import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager} import org.apache.samza.system._ @@ -221,12 +222,12 @@ class TaskInstance( } } - def timer(coordinator: ReadableCoordinator) { - trace("Timer for taskName: %s" format taskName) + def scheduler(coordinator: ReadableCoordinator) { + trace("Scheduler for taskName: %s" format taskName) exceptionHandler.maybeHandle { context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry => - entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, collector, coordinator) + entry.getValue.asInstanceOf[SchedulingCallback[Any]].execute(entry.getKey.getKey, collector, coordinator) } } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java index a5b15b8b1e..0cf8b1d382 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -28,7 +28,7 @@ import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -177,7 +177,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 249ff09f74..91e574ea32 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -28,7 +28,7 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.metrics.Timer; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; @@ -220,7 +220,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java index 7704a5b034..9ca3f832af 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.TableImpl; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.table.TableSpec; @@ -72,7 +72,7 @@ private static void assertClonedOpSpec(OperatorSpec oOpSpec, OperatorSpec nOpSpe assertNotEquals(oOpSpec, nOpSpec); assertEquals(oOpSpec.getOpId(), nOpSpec.getOpId()); assertEquals(oOpSpec.getOpCode(), nOpSpec.getOpCode()); - assertTimerFnsNotEqual(oOpSpec.getTimerFn(), nOpSpec.getTimerFn()); + assertSchedulingFnsNotEqual(oOpSpec.getSchedulingFn(), nOpSpec.getSchedulingFn()); assertWatermarkFnNotEqual(nOpSpec.getWatermarkFn(), nOpSpec.getWatermarkFn()); assertAllOperators(oOpSpec.getRegisteredOperatorSpecs(), nOpSpec.getRegisteredOperatorSpecs()); } @@ -84,11 +84,11 @@ private static void assertWatermarkFnNotEqual(WatermarkFunction watermarkFn, Wat assertNotEquals(watermarkFn, watermarkFn1); } - private static void assertTimerFnsNotEqual(TimerFunction timerFn, TimerFunction timerFn1) { - if (timerFn == timerFn1 && timerFn == null) { + private static void assertSchedulingFnsNotEqual(SchedulingFunction schedulingFn, SchedulingFunction schedulingFn1) { + if (schedulingFn == schedulingFn1 && schedulingFn == null) { return; } - assertNotEquals(timerFn, timerFn1); + assertNotEquals(schedulingFn, schedulingFn1); } private static void assertClonedTables(Map originalTables, Map clonedTables) { diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java index a9ccd12ad6..be851b91b1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; -import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.KeyScheduler; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; @@ -33,7 +33,7 @@ import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -74,7 +74,8 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { } } - private static class MapWithTimerFn implements MapFunction, TimerFunction { + private static class MapWithSchedulingFn implements MapFunction, + SchedulingFunction { @Override public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { @@ -82,12 +83,12 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(String key, long timestamp) { + public Collection executeForKey(String key, long timestamp) { return null; } } @@ -164,8 +165,8 @@ public void testStreamOperatorSpecWithFlatMap() { assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getSchedulingFn()); + assertNull(cloneOperatorSpec.getSchedulingFn()); } @Test @@ -187,8 +188,8 @@ public void testStreamOperatorSpecWithMap() { assertNotEquals(userFn, clonedUserFn); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getSchedulingFn()); + assertNull(cloneOperatorSpec.getSchedulingFn()); } @Test @@ -209,8 +210,8 @@ public void testStreamOperatorSpecWithFilter() { assertNotEquals(userFn, clonedUserFn); assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getSchedulingFn()); + assertNull(cloneOperatorSpec.getSchedulingFn()); } @Test @@ -360,13 +361,13 @@ public void testMapStreamOperatorSpecWithWatermark() { assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn); assertNotNull(cloneOperatorSpec.getWatermarkFn()); assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn()); - assertNull(streamOperatorSpec.getTimerFn()); - assertNull(cloneOperatorSpec.getTimerFn()); + assertNull(streamOperatorSpec.getSchedulingFn()); + assertNull(cloneOperatorSpec.getSchedulingFn()); } @Test - public void testMapStreamOperatorSpecWithTimer() { - MapWithTimerFn testMapFn = new MapWithTimerFn(); + public void testMapStreamOperatorSpecWithSchedulingFunction() { + MapWithSchedulingFn testMapFn = new MapWithSchedulingFn(); StreamOperatorSpec streamOperatorSpec = OperatorSpecs.createMapOperatorSpec(testMapFn, "op0"); @@ -378,9 +379,9 @@ public void testMapStreamOperatorSpecWithTimer() { assertNull(streamOperatorSpec.getWatermarkFn()); assertNull(cloneOperatorSpec.getWatermarkFn()); assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn()); - assertEquals(streamOperatorSpec.getTimerFn(), testMapFn); - assertNotNull(cloneOperatorSpec.getTimerFn()); - assertNotEquals(streamOperatorSpec.getTimerFn(), cloneOperatorSpec.getTimerFn()); + assertEquals(streamOperatorSpec.getSchedulingFn(), testMapFn); + assertNotNull(cloneOperatorSpec.getSchedulingFn()); + assertNotEquals(streamOperatorSpec.getSchedulingFn(), cloneOperatorSpec.getSchedulingFn()); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index db7079c8b3..adeeb6df74 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -23,13 +23,13 @@ import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.KeyScheduler; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; @@ -58,7 +58,7 @@ public class TestPartitionByOperatorSpec { private final String testJobId = "1"; private final String testRepartitionedStreamName = "parByKey"; - class TimerMapFn implements MapFunction, TimerFunction { + class SchedulingMapFn implements MapFunction, SchedulingFunction { @Override public String apply(Object message) { @@ -66,12 +66,12 @@ public String apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(String key, long timestamp) { + public Collection executeForKey(String key, long timestamp) { return null; } } @@ -117,7 +117,7 @@ public void testPartitionBy() { assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); assertTrue(inputOpSpec.isKeyed()); - assertNull(inputOpSpec.getTimerFn()); + assertNull(inputOpSpec.getSchedulingFn()); assertNull(inputOpSpec.getWatermarkFn()); InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); @@ -126,7 +126,7 @@ public void testPartitionBy() { assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); - assertNull(reparOpSpec.getTimerFn()); + assertNull(reparOpSpec.getSchedulingFn()); assertNull(reparOpSpec.getWatermarkFn()); } @@ -144,7 +144,7 @@ public void testPartitionByWithNoSerde() { assertNull(inputOpSpec.getKeySerde()); assertNull(inputOpSpec.getValueSerde()); assertTrue(inputOpSpec.isKeyed()); - assertNull(inputOpSpec.getTimerFn()); + assertNull(inputOpSpec.getSchedulingFn()); assertNull(inputOpSpec.getWatermarkFn()); InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); @@ -153,7 +153,7 @@ public void testPartitionByWithNoSerde() { assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); - assertNull(reparOpSpec.getTimerFn()); + assertNull(reparOpSpec.getSchedulingFn()); assertNull(reparOpSpec.getWatermarkFn()); } @@ -169,8 +169,8 @@ public void testCopy() { } @Test(expected = IllegalArgumentException.class) - public void testTimerFunctionAsKeyFn() { - TimerMapFn keyFn = new TimerMapFn(); + public void testSchedulingFunctionAsKeyFn() { + SchedulingMapFn keyFn = new SchedulingMapFn(); new StreamApplicationDescriptorImpl(appDesc -> { MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); inputStream.partitionBy(keyFn, m -> m, "parByKey"); @@ -187,8 +187,8 @@ public void testWatermarkFunctionAsKeyFn() { } @Test(expected = IllegalArgumentException.class) - public void testTimerFunctionAsValueFn() { - TimerMapFn valueFn = new TimerMapFn(); + public void testSchedulingFunctionAsValueFn() { + SchedulingMapFn valueFn = new SchedulingMapFn(); new StreamApplicationDescriptorImpl(appDesc -> { MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index 0a2214bfcf..32fb490157 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -19,8 +19,8 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.KeyScheduler; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.FoldLeftFunction; @@ -90,8 +90,8 @@ public void testTriggerIntervalWithSingleTimeTrigger() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsInitializer() { - class TimedSupplierFunction implements SupplierFunction, TimerFunction { + public void testIllegalSchedulingFunctionAsInitializer() { + class TimedSupplierFunction implements SupplierFunction, SchedulingFunction { @Override public Collection get() { @@ -99,12 +99,12 @@ public Collection get() { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection executeForKey(Object key, long timestamp) { return null; } } @@ -138,8 +138,8 @@ public Long getOutputWatermark() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsKeyFn() { - class TimerMapFunction implements MapFunction, TimerFunction { + public void testIllegalSchedulingFunctionAsKeyFn() { + class SchedulingMapFunction implements MapFunction, SchedulingFunction { @Override public Object apply(Object message) { @@ -147,16 +147,16 @@ public Object apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection executeForKey(Object key, long timestamp) { return null; } } - keyFn = new TimerMapFunction(); + keyFn = new SchedulingMapFunction(); getWindowOperatorSpec("w0"); } @@ -186,8 +186,8 @@ public Long getOutputWatermark() { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsEventTimeFn() { - class TimerMapFunction implements MapFunction, TimerFunction { + public void testIllegalSchedulingFunctionAsEventTimeFn() { + class SchedulingMapFunction implements MapFunction, SchedulingFunction { @Override public Long apply(Object message) { @@ -195,16 +195,16 @@ public Long apply(Object message) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection executeForKey(Object key, long timestamp) { return null; } } - timeFn = new TimerMapFunction(); + timeFn = new SchedulingMapFunction(); getWindowOperatorSpec("w0"); } @@ -234,8 +234,9 @@ public Long getOutputWatermark() { } @Test - public void testTimerFunctionAsFoldLeftFn() { - class TimerFoldLeftFunction implements FoldLeftFunction, TimerFunction { + public void testSchedulingFunctionAsFoldLeftFn() { + class SchedulingFoldLeftFunction + implements FoldLeftFunction, SchedulingFunction { @Override public Collection apply(Object message, Collection oldValue) { @@ -244,19 +245,19 @@ public Collection apply(Object message, Collection oldValue) { } @Override - public void registerTimer(TimerRegistry timerRegistry) { + public void schedulingInit(KeyScheduler keyScheduler) { } @Override - public Collection onTimer(Object key, long timestamp) { + public Collection executeForKey(Object key, long timestamp) { return null; } } - foldFn = new TimerFoldLeftFunction(); + foldFn = new SchedulingFoldLeftFunction(); WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0"); - assertEquals(windowSpec.getTimerFn(), foldFn); + assertEquals(windowSpec.getSchedulingFn(), foldFn); assertNull(windowSpec.getWatermarkFn()); } @@ -284,7 +285,7 @@ public Collection apply(Object message, Collection oldValue) { foldFn = new WatermarkFoldLeftFunction(); WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0"); assertEquals(windowSpec.getWatermarkFn(), foldFn); - assertNull(windowSpec.getTimerFn()); + assertNull(windowSpec.getSchedulingFn()); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java index dd08121728..038ff27680 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java @@ -51,7 +51,7 @@ private ScheduledExecutorService createExecutorService() { private void fireTimers(SystemTimerScheduler factory) { factory.removeReadyTimers().entrySet().forEach(entry -> { - entry.getValue().onTimer(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); + entry.getValue().execute(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); }); } diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java index 1acfc4795c..8b17f59a9c 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -39,7 +39,7 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; @@ -192,7 +192,7 @@ public WatermarkFunction getWatermarkFn() { } @Override - public TimerFunction getTimerFn() { + public SchedulingFunction getSchedulingFn() { return null; } }; diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java similarity index 91% rename from samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java rename to samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java index d4e0e14f02..658492a7a8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -26,9 +26,9 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.samza.test.framework.TestTimerApp.*; +import static org.apache.samza.test.framework.TestSchedulingApp.*; -public class TimerTest extends StreamApplicationIntegrationTestHarness { +public class SchedulingTest extends StreamApplicationIntegrationTestHarness { @Before public void setup() { @@ -55,6 +55,6 @@ public void testJob() throws InterruptedException { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); - runApplication(new TestTimerApp(), "TimerTest", configs); + runApplication(new TestSchedulingApp(), "SchedulingTest", configs); } } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java similarity index 78% rename from samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java rename to samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java index e72a965d26..8a4f2764ff 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java @@ -26,16 +26,16 @@ import java.util.List; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.KeyScheduler; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.SchedulingFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -public class TestTimerApp implements StreamApplication { +public class TestSchedulingApp implements StreamApplication { public static final String PAGE_VIEWS = "page-views"; @Override @@ -44,9 +44,9 @@ public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); KafkaInputDescriptor isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); final MessageStream pageViews = appDesc.getInputStream(isd); - final MessageStream output = pageViews.flatMap(new FlatmapTimerFn()); + final MessageStream output = pageViews.flatMap(new FlatmapSchedulingFn()); - MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) + MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde) .containsInAnyOrder( Arrays.asList( new PageView("v1-complete", "p1", "u1"), @@ -56,14 +56,15 @@ public void describe(StreamApplicationDescriptor appDesc) { )); } - private static class FlatmapTimerFn implements FlatMapFunction, TimerFunction { + private static class FlatmapSchedulingFn + implements FlatMapFunction, SchedulingFunction { private transient List pageViews; - private transient TimerRegistry timerRegistry; + private transient KeyScheduler keyScheduler; @Override - public void registerTimer(TimerRegistry timerRegistry) { - this.timerRegistry = timerRegistry; + public void schedulingInit(KeyScheduler keyScheduler) { + this.keyScheduler = keyScheduler; this.pageViews = new ArrayList<>(); } @@ -75,13 +76,13 @@ public Collection apply(PageView message) { if (pageViews.size() == 2) { //got all messages for this task final long time = System.currentTimeMillis() + 100; - timerRegistry.register("CompleteTimer", time); + keyScheduler.schedule("CompleteScheduler", time); } return Collections.emptyList(); } @Override - public Collection onTimer(String key, long time) { + public Collection executeForKey(String key, long time) { return pageViews; } } From db4fad6debe101f5103537f7c942011505ea2829 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Mon, 17 Sep 2018 17:46:18 -0700 Subject: [PATCH 2/4] minor style check update --- .../main/java/org/apache/samza/operators/KeyScheduler.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java b/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java index 22e87d5e6d..be8dae2b79 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java +++ b/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java @@ -19,11 +19,9 @@ package org.apache.samza.operators; -import org.apache.samza.operators.functions.SchedulingFunction; - - /** - * Manages scheduling keys to be triggered later. See {@link SchedulingFunction} for details. + * Manages scheduling keys to be triggered later. See {@link org.apache.samza.operators.functions.SchedulingFunction} + * for details. * @param type of the key */ public interface KeyScheduler { From b63e032c2cbb8e1eadbc7bd6acb30bd431272d54 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Tue, 18 Sep 2018 13:44:29 -0700 Subject: [PATCH 3/4] simplify terminology: rename Scheduling to Scheduled, rename KeyScheduler to Scheduler --- .../{KeyScheduler.java => Scheduler.java} | 4 +- ...ngFunction.java => ScheduledFunction.java} | 23 ++++----- ...ngCallback.java => ScheduledCallback.java} | 4 +- .../org/apache/samza/task/TaskContext.java | 4 +- .../samza/container/TaskContextImpl.java | 4 +- .../samza/operators/impl/OperatorImpl.java | 20 ++++---- .../operators/impl/OperatorImplGraph.java | 8 ++-- .../operators/spec/BroadcastOperatorSpec.java | 4 +- .../operators/spec/FilterOperatorSpec.java | 6 +-- .../operators/spec/FlatMapOperatorSpec.java | 6 +-- .../operators/spec/InputOperatorSpec.java | 4 +- .../operators/spec/JoinOperatorSpec.java | 6 +-- .../samza/operators/spec/MapOperatorSpec.java | 6 +-- .../operators/spec/MergeOperatorSpec.java | 4 +- .../samza/operators/spec/OperatorSpec.java | 4 +- .../operators/spec/OutputOperatorSpec.java | 4 +- .../spec/PartitionByOperatorSpec.java | 12 ++--- .../spec/SendToTableOperatorSpec.java | 4 +- .../operators/spec/SinkOperatorSpec.java | 6 +-- .../spec/StreamTableJoinOperatorSpec.java | 6 +-- .../operators/spec/WindowOperatorSpec.java | 18 +++---- .../samza/task/SystemTimerScheduler.java | 10 ++-- .../apache/samza/container/TaskInstance.scala | 4 +- .../operators/TestOperatorSpecGraph.java | 4 +- .../operators/impl/TestOperatorImpl.java | 4 +- .../operators/spec/OperatorSpecTestUtils.java | 10 ++-- .../operators/spec/TestOperatorSpec.java | 38 +++++++-------- .../spec/TestPartitionByOperatorSpec.java | 26 +++++----- .../spec/TestWindowOperatorSpec.java | 48 +++++++++---------- .../samza/task/TestSystemTimerScheduler.java | 2 +- .../sql/translator/TestProjectTranslator.java | 4 +- .../test/framework/TestSchedulingApp.java | 20 ++++---- 32 files changed, 164 insertions(+), 163 deletions(-) rename samza-api/src/main/java/org/apache/samza/operators/{KeyScheduler.java => Scheduler.java} (93%) rename samza-api/src/main/java/org/apache/samza/operators/functions/{SchedulingFunction.java => ScheduledFunction.java} (72%) rename samza-api/src/main/java/org/apache/samza/scheduler/{SchedulingCallback.java => ScheduledCallback.java} (91%) diff --git a/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java similarity index 93% rename from samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java rename to samza-api/src/main/java/org/apache/samza/operators/Scheduler.java index be8dae2b79..05bea38bca 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/KeyScheduler.java +++ b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java @@ -20,11 +20,11 @@ package org.apache.samza.operators; /** - * Manages scheduling keys to be triggered later. See {@link org.apache.samza.operators.functions.SchedulingFunction} + * Manages scheduling keys to be triggered later. See {@link org.apache.samza.operators.functions.ScheduledFunction} * for details. * @param type of the key */ -public interface KeyScheduler { +public interface Scheduler { /** * Schedule a key to be triggered at {@code timestamp}. diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java similarity index 72% rename from samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java rename to samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java index 21cb0895b7..b75b335a07 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/SchedulingFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java @@ -19,27 +19,28 @@ package org.apache.samza.operators.functions; -import org.apache.samza.operators.KeyScheduler; +import org.apache.samza.operators.Scheduler; import java.util.Collection; + /** * Allows scheduling with key(s) and is invoked when the specified time(s) occurs. * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the * corresponding schedule time occurs. * *

- * Example of a {@link FlatMapFunction} with {@link SchedulingFunction}: + * Example of a {@link FlatMapFunction} with {@link ScheduledFunction}: *

{@code
- *    public class ExampleSchedulingFn implements FlatMapFunction, SchedulingFunction {
- *      public void schedulingInit(KeyScheduler keyScheduler) {
+ *    public class ExampleScheduledFn implements FlatMapFunction, ScheduledFunction {
+ *      public void schedule(Scheduler scheduler) {
  *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        keyScheduler.schedule("example-scheduler-logic", time);
+ *        scheduler.schedule("example-scheduler-logic", time);
  *      }
  *      public Collection apply(String s) {
  *        ...
  *      }
- *      public Collection executeForKey(String key, long timestamp) {
+ *      public Collection onCallback(String key, long timestamp) {
  *        // fired with key as "example-scheduler-logic"
  *        ...
  *      }
@@ -48,14 +49,14 @@
  * @param  type of the key
  * @param  type of the output
  */
-public interface SchedulingFunction {
+public interface ScheduledFunction {
 
   /**
    * Initialize the function for scheduling, such as setting some initial scheduling logic or saving the
-   * {@code keyScheduler} for later use.
-   * @param keyScheduler used to specify the schedule time(s) and key(s)
+   * {@code scheduler} for later use.
+   * @param scheduler used to specify the schedule time(s) and key(s)
    */
-  void schedulingInit(KeyScheduler keyScheduler);
+  void schedule(Scheduler scheduler);
 
   /**
    * Returns the output from the scheduling logic corresponding to the key that was triggered.
@@ -63,5 +64,5 @@ public interface SchedulingFunction {
    * @param timestamp schedule time that was set for the key, in milliseconds since epoch
    * @return {@link Collection} of output elements
    */
-  Collection executeForKey(K key, long timestamp);
+  Collection onCallback(K key, long timestamp);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
similarity index 91%
rename from samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java
rename to samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
index 34a5341f0e..87454228a5 100644
--- a/samza-api/src/main/java/org/apache/samza/scheduler/SchedulingCallback.java
+++ b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
@@ -28,12 +28,12 @@
  * {@link org.apache.samza.task.TaskContext} is reached.
  * @param  type of the callback key
  */
-public interface SchedulingCallback {
+public interface ScheduledCallback {
   /**
    * Invoked when the corresponding schedule time is reached.
    * @param key key for callback
    * @param collector contains the means of sending message envelopes to the output stream.
    * @param coordinator manages execution of tasks.
    */
-  void execute(K key, MessageCollector collector, TaskCoordinator coordinator);
+  void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 32b9be3aa6..f142d5ca8f 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -24,7 +24,7 @@
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.scheduler.SchedulingCallback;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 
@@ -85,7 +85,7 @@ default Object getUserContext() {
    * @param callback callback to call when the {@code timestamp} is reached
    * @param  type of the key
    */
-   void scheduleCallback(K key, long timestamp, SchedulingCallback callback);
+   void scheduleCallback(K key, long timestamp, ScheduledCallback callback);
 
   /**
    * Delete the keyed callback in this task.
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 05f849099d..f1d265a72a 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -31,7 +31,7 @@
 import org.apache.samza.table.TableManager;
 import org.apache.samza.task.SystemTimerScheduler;
 import org.apache.samza.task.TaskContext;
-import org.apache.samza.scheduler.SchedulingCallback;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,7 +134,7 @@ public Object getUserContext() {
   }
 
   @Override
-  public  void scheduleCallback(K key, long timestamp, SchedulingCallback callback) {
+  public  void scheduleCallback(K key, long timestamp, ScheduledCallback callback) {
     timerScheduler.setTimer(key, timestamp, callback);
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 851b48f012..64db79150a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -25,8 +25,8 @@
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.KeyScheduler;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
@@ -436,19 +436,19 @@ final long getOutputWatermark() {
 
   /**
    * Returns a registry which allows registering arbitrary system-clock timer with K-typed key.
-   * The user-defined function in the operator spec needs to implement {@link SchedulingFunction#executeForKey(Object, long)}
+   * The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)}
    * for timer notifications.
    * @param  key type for the timer.
-   * @return an instance of {@link KeyScheduler}
+   * @return an instance of {@link Scheduler}
    */
-   KeyScheduler createOperatorKeyScheduler() {
-    return new KeyScheduler() {
+   Scheduler createOperatorScheduler() {
+    return new Scheduler() {
       @Override
       public void schedule(K key, long time) {
         taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> {
-            final SchedulingFunction schedulingFn = getOperatorSpec().getSchedulingFn();
-            if (schedulingFn != null) {
-              final Collection output = schedulingFn.executeForKey(key, time);
+            final ScheduledFunction scheduledFn = getOperatorSpec().getScheduledFn();
+            if (scheduledFn != null) {
+              final Collection output = scheduledFn.onCallback(key, time);
 
               if (!output.isEmpty()) {
                 output.forEach(rm ->
@@ -457,7 +457,7 @@ public void schedule(K key, long time) {
               }
             } else {
               throw new SamzaException(
-                  String.format("Operator %s id %s (created at %s) must implement SchedulingFunction to use system timer.",
+                  String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.",
                       getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation()));
             }
           });
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 13b1fc593e..6c78ae4e51 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -26,7 +26,7 @@
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.KeyScheduler;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.util.TimestampedValue;
@@ -170,9 +170,9 @@ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec
       operatorImpl.init(config, context);
       operatorImpl.registerInputStream(inputStream);
 
-      if (operatorSpec.getSchedulingFn() != null) {
-        final KeyScheduler keyScheduler = operatorImpl.createOperatorKeyScheduler();
-        operatorSpec.getSchedulingFn().schedulingInit(keyScheduler);
+      if (operatorSpec.getScheduledFn() != null) {
+        final Scheduler scheduler = operatorImpl.createOperatorScheduler();
+        operatorSpec.getScheduledFn().schedule(scheduler);
       }
 
       // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
index abc9bee0c7..fb6515aa76 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
@@ -20,7 +20,7 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 public class BroadcastOperatorSpec extends OperatorSpec {
@@ -43,7 +43,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
index b26f914ee6..4e640dcf0f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
@@ -23,7 +23,7 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -68,7 +68,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return this.filterFn instanceof SchedulingFunction ? (SchedulingFunction) this.filterFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.filterFn instanceof ScheduledFunction ? (ScheduledFunction) this.filterFn : null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
index d52dc0ff2a..160f432da1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -41,7 +41,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return this.transformFn instanceof SchedulingFunction ? (SchedulingFunction) this.transformFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.transformFn instanceof ScheduledFunction ? (ScheduledFunction) this.transformFn : null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index afb8266994..1af4806dd7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -99,7 +99,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 5bb64e44ce..bb6ed59c5c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -20,7 +20,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.util.TimestampedValue;
@@ -105,8 +105,8 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return joinFn instanceof SchedulingFunction ? (SchedulingFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
   public OperatorSpec getLeftInputOpSpec() {
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
index 36a55422fc..6ce522f687 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
@@ -23,7 +23,7 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -71,7 +71,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return this.mapFn instanceof SchedulingFunction ? (SchedulingFunction) this.mapFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.mapFn instanceof ScheduledFunction ? (ScheduledFunction) this.mapFn : null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
index 18dd15ea36..3685c5f02a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -45,7 +45,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 6cd16cf17c..0442f7c8b7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -25,7 +25,7 @@
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -143,5 +143,5 @@ public final String getSourceLocation() {
 
   abstract public WatermarkFunction getWatermarkFn();
 
-  abstract public SchedulingFunction getSchedulingFn();
+  abstract public ScheduledFunction getScheduledFn();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 3487b1a8f8..d6238b8fcf 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -59,7 +59,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index dee72e392e..069c8673be 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -20,7 +20,7 @@
 
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -55,10 +55,10 @@ public class PartitionByOperatorSpec extends OperatorSpec {
       MapFunction keyFunction,
       MapFunction valueFunction, String opId) {
     super(OpCode.PARTITION_BY, opId);
-    checkArgument(!(keyFunction instanceof SchedulingFunction || keyFunction instanceof WatermarkFunction),
-        "keyFunction for partitionBy should not implement SchedulingFunction or WatermarkFunction.");
-    checkArgument(!(valueFunction instanceof SchedulingFunction || valueFunction instanceof WatermarkFunction),
-        "valueFunction for partitionBy should not implement SchedulingFunction or WatermarkFunction.");
+    checkArgument(!(keyFunction instanceof ScheduledFunction || keyFunction instanceof WatermarkFunction),
+        "keyFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
+    checkArgument(!(valueFunction instanceof ScheduledFunction || valueFunction instanceof WatermarkFunction),
+        "valueFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
     this.outputStream = outputStream;
     this.keyFunction = keyFunction;
     this.valueFunction = valueFunction;
@@ -86,7 +86,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index 5a11a66ec0..bf032a2f46 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -20,7 +20,7 @@
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -58,7 +58,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index b27f8a8351..91e2775eb4 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -57,7 +57,7 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return sinkFn instanceof SchedulingFunction ? (SchedulingFunction) sinkFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return sinkFn instanceof ScheduledFunction ? (ScheduledFunction) sinkFn : null;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
index 28018d32f7..1849c640a6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
@@ -66,8 +66,8 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
-    return joinFn instanceof SchedulingFunction ? (SchedulingFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index f4a074a224..ede16a5594 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,7 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.AnyTrigger;
@@ -64,14 +64,14 @@ public class WindowOperatorSpec extends OperatorSpec window, String opId) {
     super(OpCode.WINDOW, opId);
     checkArgument(window.getInitializer() == null ||
-        !(window.getInitializer() instanceof SchedulingFunction || window.getInitializer() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the initializer.");
+        !(window.getInitializer() instanceof ScheduledFunction || window.getInitializer() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the initializer.");
     checkArgument(window.getKeyExtractor() == null ||
-        !(window.getKeyExtractor() instanceof SchedulingFunction || window.getKeyExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the keyExtractor.");
+        !(window.getKeyExtractor() instanceof ScheduledFunction || window.getKeyExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the keyExtractor.");
     checkArgument(window.getEventTimeExtractor() == null ||
-        !(window.getEventTimeExtractor() instanceof SchedulingFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined SchedulingFunction or WatermarkFunction as the eventTimeExtractor.");
+        !(window.getEventTimeExtractor() instanceof ScheduledFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the eventTimeExtractor.");
     this.window = window;
   }
 
@@ -135,9 +135,9 @@ public WatermarkFunction getWatermarkFn() {
   }
 
   @Override
-  public SchedulingFunction getSchedulingFn() {
+  public ScheduledFunction getScheduledFn() {
     FoldLeftFunction fn = window.getFoldLeftFunction();
-    return fn instanceof SchedulingFunction ? (SchedulingFunction) fn : null;
+    return fn instanceof ScheduledFunction ? (ScheduledFunction) fn : null;
   }
 
   @Override
diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
index eba9e583e6..f89175e190 100644
--- a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
@@ -25,7 +25,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.scheduler.SchedulingCallback;
+import org.apache.samza.scheduler.ScheduledCallback;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -47,7 +47,7 @@ public interface TimerListener {
 
   private final ScheduledExecutorService executor;
   private final Map scheduledFutures = new ConcurrentHashMap<>();
-  private final Map, SchedulingCallback> readyTimers = new ConcurrentHashMap<>();
+  private final Map, ScheduledCallback> readyTimers = new ConcurrentHashMap<>();
   private TimerListener timerListener;
 
   public static SystemTimerScheduler create(ScheduledExecutorService executor) {
@@ -58,7 +58,7 @@ private SystemTimerScheduler(ScheduledExecutorService executor) {
     this.executor = executor;
   }
 
-  public  void setTimer(K key, long timestamp, SchedulingCallback callback) {
+  public  void setTimer(K key, long timestamp, ScheduledCallback callback) {
     checkState(!scheduledFutures.containsKey(key),
         String.format("Duplicate key %s registration for the same timer", key));
 
@@ -85,8 +85,8 @@ void registerListener(TimerListener listener) {
     timerListener = listener;
   }
 
-  public Map, SchedulingCallback> removeReadyTimers() {
-    final Map, SchedulingCallback> timers = new TreeMap<>(readyTimers);
+  public Map, ScheduledCallback> removeReadyTimers() {
+    final Map, ScheduledCallback> timers = new TreeMap<>(readyTimers);
     readyTimers.keySet().removeAll(timers.keySet());
     return timers;
   }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a3448fcece..9f4fd1716f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -28,7 +28,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.scheduler.SchedulingCallback
+import org.apache.samza.scheduler.ScheduledCallback
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
 import org.apache.samza.system._
@@ -227,7 +227,7 @@ class TaskInstance(
 
     exceptionHandler.maybeHandle {
       context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry =>
-        entry.getValue.asInstanceOf[SchedulingCallback[Any]].execute(entry.getKey.getKey, collector, coordinator)
+        entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator)
       }
     }
   }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 0cf8b1d382..71f4b733b5 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -28,7 +28,7 @@
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -177,7 +177,7 @@ public WatermarkFunction getWatermarkFn() {
     }
 
     @Override
-    public SchedulingFunction getSchedulingFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 91e574ea32..6d12d99497 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -28,7 +28,7 @@
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
@@ -220,7 +220,7 @@ public WatermarkFunction getWatermarkFn() {
     }
 
     @Override
-    public SchedulingFunction getSchedulingFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index 9ca3f832af..b9a27677cd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -28,7 +28,7 @@
 import java.util.Map;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.TableImpl;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.table.TableSpec;
@@ -72,7 +72,7 @@ private static void assertClonedOpSpec(OperatorSpec oOpSpec, OperatorSpec nOpSpe
     assertNotEquals(oOpSpec, nOpSpec);
     assertEquals(oOpSpec.getOpId(), nOpSpec.getOpId());
     assertEquals(oOpSpec.getOpCode(), nOpSpec.getOpCode());
-    assertSchedulingFnsNotEqual(oOpSpec.getSchedulingFn(), nOpSpec.getSchedulingFn());
+    assertScheduledFnsNotEqual(oOpSpec.getScheduledFn(), nOpSpec.getScheduledFn());
     assertWatermarkFnNotEqual(nOpSpec.getWatermarkFn(), nOpSpec.getWatermarkFn());
     assertAllOperators(oOpSpec.getRegisteredOperatorSpecs(), nOpSpec.getRegisteredOperatorSpecs());
   }
@@ -84,11 +84,11 @@ private static void assertWatermarkFnNotEqual(WatermarkFunction watermarkFn, Wat
     assertNotEquals(watermarkFn, watermarkFn1);
   }
 
-  private static void assertSchedulingFnsNotEqual(SchedulingFunction schedulingFn, SchedulingFunction schedulingFn1) {
-    if (schedulingFn == schedulingFn1 && schedulingFn == null) {
+  private static void assertScheduledFnsNotEqual(ScheduledFunction scheduledFn, ScheduledFunction scheduledFn1) {
+    if (scheduledFn == scheduledFn1 && scheduledFn == null) {
       return;
     }
-    assertNotEquals(schedulingFn, schedulingFn1);
+    assertNotEquals(scheduledFn, scheduledFn1);
   }
 
   private static void assertClonedTables(Map originalTables, Map clonedTables) {
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index be851b91b1..860e630314 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -24,7 +24,7 @@
 import java.util.List;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.KeyScheduler;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -33,7 +33,7 @@
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -74,8 +74,8 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
     }
   }
 
-  private static class MapWithSchedulingFn implements MapFunction,
-                                                      SchedulingFunction {
+  private static class MapWithScheduledFn implements MapFunction,
+                                                     ScheduledFunction {
 
     @Override
     public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
@@ -83,12 +83,12 @@ public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
     }
 
     @Override
-    public void schedulingInit(KeyScheduler keyScheduler) {
+    public void schedule(Scheduler scheduler) {
 
     }
 
     @Override
-    public Collection executeForKey(String key, long timestamp) {
+    public Collection onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -165,8 +165,8 @@ public void testStreamOperatorSpecWithFlatMap() {
     assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getSchedulingFn());
-    assertNull(cloneOperatorSpec.getSchedulingFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -188,8 +188,8 @@ public void testStreamOperatorSpecWithMap() {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getSchedulingFn());
-    assertNull(cloneOperatorSpec.getSchedulingFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -210,8 +210,8 @@ public void testStreamOperatorSpecWithFilter() {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getSchedulingFn());
-    assertNull(cloneOperatorSpec.getSchedulingFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -361,13 +361,13 @@ public void testMapStreamOperatorSpecWithWatermark() {
     assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn);
     assertNotNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getSchedulingFn());
-    assertNull(cloneOperatorSpec.getSchedulingFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
-  public void testMapStreamOperatorSpecWithSchedulingFunction() {
-    MapWithSchedulingFn testMapFn = new MapWithSchedulingFn();
+  public void testMapStreamOperatorSpecWithScheduledFunction() {
+    MapWithScheduledFn testMapFn = new MapWithScheduledFn();
 
     StreamOperatorSpec streamOperatorSpec =
         OperatorSpecs.createMapOperatorSpec(testMapFn, "op0");
@@ -379,9 +379,9 @@ public void testMapStreamOperatorSpecWithSchedulingFunction() {
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertEquals(streamOperatorSpec.getSchedulingFn(), testMapFn);
-    assertNotNull(cloneOperatorSpec.getSchedulingFn());
-    assertNotEquals(streamOperatorSpec.getSchedulingFn(), cloneOperatorSpec.getSchedulingFn());
+    assertEquals(streamOperatorSpec.getScheduledFn(), testMapFn);
+    assertNotNull(cloneOperatorSpec.getScheduledFn());
+    assertNotEquals(streamOperatorSpec.getScheduledFn(), cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index adeeb6df74..1d98580654 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -23,13 +23,13 @@
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.KeyScheduler;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -58,7 +58,7 @@ public class TestPartitionByOperatorSpec {
   private final String testJobId = "1";
   private final String testRepartitionedStreamName = "parByKey";
 
-  class SchedulingMapFn implements MapFunction, SchedulingFunction {
+  class ScheduledMapFn implements MapFunction, ScheduledFunction {
 
     @Override
     public String apply(Object message) {
@@ -66,12 +66,12 @@ public String apply(Object message) {
     }
 
     @Override
-    public void schedulingInit(KeyScheduler keyScheduler) {
+    public void schedule(Scheduler scheduler) {
 
     }
 
     @Override
-    public Collection executeForKey(String key, long timestamp) {
+    public Collection onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -117,7 +117,7 @@ public void testPartitionBy() {
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getSchedulingFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -126,7 +126,7 @@ public void testPartitionBy() {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getSchedulingFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -144,7 +144,7 @@ public void testPartitionByWithNoSerde() {
     assertNull(inputOpSpec.getKeySerde());
     assertNull(inputOpSpec.getValueSerde());
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getSchedulingFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -153,7 +153,7 @@ public void testPartitionByWithNoSerde() {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getSchedulingFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -169,8 +169,8 @@ public void testCopy() {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testSchedulingFunctionAsKeyFn() {
-    SchedulingMapFn keyFn = new SchedulingMapFn();
+  public void testScheduledFunctionAsKeyFn() {
+    ScheduledMapFn keyFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(keyFn, m -> m, "parByKey");
@@ -187,8 +187,8 @@ public void testWatermarkFunctionAsKeyFn() {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testSchedulingFunctionAsValueFn() {
-    SchedulingMapFn valueFn = new SchedulingMapFn();
+  public void testScheduledFunctionAsValueFn() {
+    ScheduledMapFn valueFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index 32fb490157..41973b2294 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.KeyScheduler;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.FoldLeftFunction;
@@ -90,8 +90,8 @@ public void testTriggerIntervalWithSingleTimeTrigger() {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalSchedulingFunctionAsInitializer() {
-    class TimedSupplierFunction implements SupplierFunction, SchedulingFunction {
+  public void testIllegalScheduledFunctionAsInitializer() {
+    class TimedSupplierFunction implements SupplierFunction, ScheduledFunction {
 
       @Override
       public Collection get() {
@@ -99,12 +99,12 @@ public Collection get() {
       }
 
       @Override
-      public void schedulingInit(KeyScheduler keyScheduler) {
+      public void schedule(Scheduler scheduler) {
 
       }
 
       @Override
-      public Collection executeForKey(Object key, long timestamp) {
+      public Collection onCallback(Object key, long timestamp) {
         return null;
       }
     }
@@ -138,8 +138,8 @@ public Long getOutputWatermark() {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalSchedulingFunctionAsKeyFn() {
-    class SchedulingMapFunction implements MapFunction, SchedulingFunction {
+  public void testIllegalScheduledFunctionAsKeyFn() {
+    class ScheduledMapFunction implements MapFunction, ScheduledFunction {
 
       @Override
       public Object apply(Object message) {
@@ -147,16 +147,16 @@ public Object apply(Object message) {
       }
 
       @Override
-      public void schedulingInit(KeyScheduler keyScheduler) {
+      public void schedule(Scheduler scheduler) {
 
       }
 
       @Override
-      public Collection executeForKey(Object key, long timestamp) {
+      public Collection onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    keyFn = new SchedulingMapFunction();
+    keyFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -186,8 +186,8 @@ public Long getOutputWatermark() {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalSchedulingFunctionAsEventTimeFn() {
-    class SchedulingMapFunction implements MapFunction, SchedulingFunction {
+  public void testIllegalScheduledFunctionAsEventTimeFn() {
+    class ScheduledMapFunction implements MapFunction, ScheduledFunction {
 
       @Override
       public Long apply(Object message) {
@@ -195,16 +195,16 @@ public Long apply(Object message) {
       }
 
       @Override
-      public void schedulingInit(KeyScheduler keyScheduler) {
+      public void schedule(Scheduler scheduler) {
 
       }
 
       @Override
-      public Collection executeForKey(Object key, long timestamp) {
+      public Collection onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    timeFn = new SchedulingMapFunction();
+    timeFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -234,9 +234,9 @@ public Long getOutputWatermark() {
   }
 
   @Test
-  public void testSchedulingFunctionAsFoldLeftFn() {
-    class SchedulingFoldLeftFunction
-        implements FoldLeftFunction, SchedulingFunction {
+  public void testScheduledFunctionAsFoldLeftFn() {
+    class ScheduledFoldLeftFunction
+        implements FoldLeftFunction, ScheduledFunction {
 
       @Override
       public Collection apply(Object message, Collection oldValue) {
@@ -245,19 +245,19 @@ public Collection apply(Object message, Collection oldValue) {
       }
 
       @Override
-      public void schedulingInit(KeyScheduler keyScheduler) {
+      public void schedule(Scheduler scheduler) {
 
       }
 
       @Override
-      public Collection executeForKey(Object key, long timestamp) {
+      public Collection onCallback(Object key, long timestamp) {
         return null;
       }
     }
 
-    foldFn = new SchedulingFoldLeftFunction();
+    foldFn = new ScheduledFoldLeftFunction();
     WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0");
-    assertEquals(windowSpec.getSchedulingFn(), foldFn);
+    assertEquals(windowSpec.getScheduledFn(), foldFn);
     assertNull(windowSpec.getWatermarkFn());
   }
 
@@ -285,7 +285,7 @@ public Collection apply(Object message, Collection oldValue) {
     foldFn = new WatermarkFoldLeftFunction();
     WindowOperatorSpec windowSpec = getWindowOperatorSpec("w0");
     assertEquals(windowSpec.getWatermarkFn(), foldFn);
-    assertNull(windowSpec.getSchedulingFn());
+    assertNull(windowSpec.getScheduledFn());
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
index 038ff27680..7e6324d62a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
@@ -51,7 +51,7 @@ private ScheduledExecutorService createExecutorService() {
 
   private void fireTimers(SystemTimerScheduler factory) {
     factory.removeReadyTimers().entrySet().forEach(entry -> {
-        entry.getValue().execute(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class));
+        entry.getValue().onCallback(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class));
       });
   }
 
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 8b17f59a9c..3046c1fa49 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -39,7 +39,7 @@
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
@@ -192,7 +192,7 @@ public WatermarkFunction getWatermarkFn() {
       }
 
       @Override
-      public SchedulingFunction getSchedulingFn() {
+      public ScheduledFunction getScheduledFn() {
         return null;
       }
     };
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
index 8a4f2764ff..db78e8cb21 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -26,10 +26,10 @@
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.operators.KeyScheduler;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.SchedulingFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
@@ -44,7 +44,7 @@ public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
     KafkaInputDescriptor isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
     final MessageStream pageViews = appDesc.getInputStream(isd);
-    final MessageStream output = pageViews.flatMap(new FlatmapSchedulingFn());
+    final MessageStream output = pageViews.flatMap(new FlatmapScheduledFn());
 
     MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
         .containsInAnyOrder(
@@ -56,15 +56,15 @@ public void describe(StreamApplicationDescriptor appDesc) {
             ));
   }
 
-  private static class FlatmapSchedulingFn
-      implements FlatMapFunction, SchedulingFunction {
+  private static class FlatmapScheduledFn
+      implements FlatMapFunction, ScheduledFunction {
 
     private transient List pageViews;
-    private transient KeyScheduler keyScheduler;
+    private transient Scheduler scheduler;
 
     @Override
-    public void schedulingInit(KeyScheduler keyScheduler) {
-      this.keyScheduler = keyScheduler;
+    public void schedule(Scheduler scheduler) {
+      this.scheduler = scheduler;
       this.pageViews = new ArrayList<>();
     }
 
@@ -76,13 +76,13 @@ public Collection apply(PageView message) {
       if (pageViews.size() == 2) {
         //got all messages for this task
         final long time = System.currentTimeMillis() + 100;
-        keyScheduler.schedule("CompleteScheduler", time);
+        scheduler.schedule("CompleteScheduler", time);
       }
       return Collections.emptyList();
     }
 
     @Override
-    public Collection executeForKey(String key, long time) {
+    public Collection onCallback(String key, long time) {
       return pageViews;
     }
   }

From e7abf9f467f8e03815228314c89dbee1c731379b Mon Sep 17 00:00:00 2001
From: Cameron Lee 
Date: Thu, 20 Sep 2018 14:49:42 -0700
Subject: [PATCH 4/4] doc updates, SystemTimerScheduler renamed to
 EpochTimeScheduler

---
 .../org/apache/samza/operators/Scheduler.java | 14 ++++++-------
 .../functions/ScheduledFunction.java          | 21 ++++++++++++-------
 .../org/apache/samza/task/TaskContext.java    |  4 ++--
 .../samza/container/TaskContextImpl.java      |  8 +++----
 .../org/apache/samza/task/AsyncRunLoop.java   |  6 +++---
 ...Scheduler.java => EpochTimeScheduler.java} |  8 +++----
 ...duler.java => TestEpochTimeScheduler.java} | 16 +++++++-------
 7 files changed, 40 insertions(+), 37 deletions(-)
 rename samza-core/src/main/java/org/apache/samza/task/{SystemTimerScheduler.java => EpochTimeScheduler.java} (95%)
 rename samza-core/src/test/java/org/apache/samza/task/{TestSystemTimerScheduler.java => TestEpochTimeScheduler.java} (89%)

diff --git a/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
index 05bea38bca..77148f0a92 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
@@ -20,21 +20,19 @@
 package org.apache.samza.operators;
 
 /**
- * Manages scheduling keys to be triggered later. See {@link org.apache.samza.operators.functions.ScheduledFunction}
- * for details.
- * @param  type of the key
+ * Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
+ * @param  type of the key to schedule
  */
 public interface Scheduler {
-
   /**
-   * Schedule a key to be triggered at {@code timestamp}.
-   * @param key unique key
-   * @param timestamp epoch time when the key will be triggered, in milliseconds
+   * Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
+   * @param key unique key associated with the callback to schedule
+   * @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
    */
   void schedule(K key, long timestamp);
 
   /**
-   * Delete the scheduler entry for the provided key.
+   * Delete the scheduled callback for the provided {@code key}.
    * @param key key to delete
    */
   void delete(K key);
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
index b75b335a07..952948c515 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
@@ -25,7 +25,7 @@
 
 
 /**
- * Allows scheduling with key(s) and is invoked when the specified time(s) occurs.
+ * Allows scheduling a callback for a specific epoch-time.
  * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
  * corresponding schedule time occurs.
  *
@@ -33,16 +33,23 @@
  * Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
  * 
{@code
  *    public class ExampleScheduledFn implements FlatMapFunction, ScheduledFunction {
+ *      // for recurring callbacks, keep track of the scheduler from "schedule"
+ *      private Scheduler scheduler;
+ *
  *      public void schedule(Scheduler scheduler) {
+ *        // save the scheduler for recurring callbacks
+ *        this.scheduler = scheduler;
  *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        scheduler.schedule("example-scheduler-logic", time);
+ *        scheduler.schedule("do-delayed-logic", time);
  *      }
  *      public Collection apply(String s) {
  *        ...
  *      }
  *      public Collection onCallback(String key, long timestamp) {
- *        // fired with key as "example-scheduler-logic"
+ *        // do some logic for key "do-delayed-logic"
  *        ...
+ *        // for recurring callbacks, call the saved scheduler again
+ *        this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
  *      }
  *    }
  * }
@@ -50,18 +57,16 @@ * @param type of the output */ public interface ScheduledFunction { - /** - * Initialize the function for scheduling, such as setting some initial scheduling logic or saving the - * {@code scheduler} for later use. + * Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks. * @param scheduler used to specify the schedule time(s) and key(s) */ void schedule(Scheduler scheduler); /** * Returns the output from the scheduling logic corresponding to the key that was triggered. - * @param key key corresponding to the scheduling logic that got triggered - * @param timestamp schedule time that was set for the key, in milliseconds since epoch + * @param key key corresponding to the callback that got invoked + * @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch * @return {@link Collection} of output elements */ Collection onCallback(K key, long timestamp); diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index f142d5ca8f..eccedba3f2 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -77,7 +77,7 @@ default Object getUserContext() { } /** - * Register a keyed callback in this task. + * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}. * The callback will be invoked exclusively with any other operations for this task, * e.g. processing, windowing and commit. * @param key key for the callback @@ -88,7 +88,7 @@ default Object getUserContext() { void scheduleCallback(K key, long timestamp, ScheduledCallback callback); /** - * Delete the keyed callback in this task. + * Delete the scheduled {@code callback} for the {@code key}. * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt. * @param key callback key * @param type of the key diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index f1d265a72a..bea6373baf 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -29,7 +29,7 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; import org.apache.samza.table.TableManager; -import org.apache.samza.task.SystemTimerScheduler; +import org.apache.samza.task.EpochTimeScheduler; import org.apache.samza.task.TaskContext; import org.apache.samza.scheduler.ScheduledCallback; import org.slf4j.Logger; @@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext { private final JobModel jobModel; private final StreamMetadataCache streamMetadataCache; private final Map objectRegistry = new HashMap<>(); - private final SystemTimerScheduler timerScheduler; + private final EpochTimeScheduler timerScheduler; private Object userContext = null; @@ -76,7 +76,7 @@ public TaskContextImpl(TaskName taskName, this.tableManager = tableManager; this.jobModel = jobModel; this.streamMetadataCache = streamMetadataCache; - this.timerScheduler = SystemTimerScheduler.create(timerExecutor); + this.timerScheduler = EpochTimeScheduler.create(timerExecutor); } @Override @@ -159,7 +159,7 @@ public StreamMetadataCache getStreamMetadataCache() { return streamMetadataCache; } - public SystemTimerScheduler getTimerScheduler() { + public EpochTimeScheduler getTimerScheduler() { return timerScheduler; } } diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index dc98d204e5..111869c2ec 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -374,9 +374,9 @@ public void run() { }, commitMs, commitMs, TimeUnit.MILLISECONDS); } - final SystemTimerScheduler timerScheduler = task.context().getTimerScheduler(); - if (timerScheduler != null) { - timerScheduler.registerListener(() -> { + final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler(); + if (epochTimeScheduler != null) { + epochTimeScheduler.registerListener(() -> { state.needScheduler(); }); } diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java similarity index 95% rename from samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java rename to samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java index f89175e190..3f50d9a477 100644 --- a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java +++ b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java @@ -36,7 +36,7 @@ * 2) keeps track of the timers created and timers that are ready. * 3) triggers listener whenever a timer fires. */ -public class SystemTimerScheduler { +public class EpochTimeScheduler { /** * For run loop to listen to timer firing so it can schedule the callbacks. @@ -50,11 +50,11 @@ public interface TimerListener { private final Map, ScheduledCallback> readyTimers = new ConcurrentHashMap<>(); private TimerListener timerListener; - public static SystemTimerScheduler create(ScheduledExecutorService executor) { - return new SystemTimerScheduler(executor); + public static EpochTimeScheduler create(ScheduledExecutorService executor) { + return new EpochTimeScheduler(executor); } - private SystemTimerScheduler(ScheduledExecutorService executor) { + private EpochTimeScheduler(ScheduledExecutorService executor) { this.executor = executor; } diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java similarity index 89% rename from samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java rename to samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java index 7e6324d62a..e0da2e95b9 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class TestSystemTimerScheduler { +public class TestEpochTimeScheduler { private ScheduledExecutorService createExecutorService() { ScheduledExecutorService service = mock(ScheduledExecutorService.class); @@ -49,7 +49,7 @@ private ScheduledExecutorService createExecutorService() { return service; } - private void fireTimers(SystemTimerScheduler factory) { + private void fireTimers(EpochTimeScheduler factory) { factory.removeReadyTimers().entrySet().forEach(entry -> { entry.getValue().onCallback(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); }); @@ -57,7 +57,7 @@ private void fireTimers(SystemTimerScheduler factory) { @Test public void testSingleTimer() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> { results.add(key); @@ -71,7 +71,7 @@ public void testSingleTimer() { @Test public void testMultipleTimers() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> { results.add(key + ":3"); @@ -97,7 +97,7 @@ public void testMultipleKeys() { Object key2 = new Object(); List results = new ArrayList<>(); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); scheduler.setTimer(key1, 2, (key, collector, coordinator) -> { assertEquals(key, key1); results.add("key1:2"); @@ -120,7 +120,7 @@ public void testMultipleKeyTypes() { Long key2 = Long.MAX_VALUE; List results = new ArrayList<>(); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); scheduler.setTimer(key1, 1, (key, collector, coordinator) -> { assertEquals(key, key1); results.add("key:1"); @@ -144,7 +144,7 @@ public void testRemoveTimer() { when(future.cancel(anyBoolean())).thenReturn(true); when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future); - SystemTimerScheduler scheduler = SystemTimerScheduler.create(service); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(service); List results = new ArrayList<>(); scheduler.setTimer("timer", 1, (key, collector, coordinator) -> { results.add(key); @@ -160,7 +160,7 @@ public void testRemoveTimer() { @Test public void testTimerListener() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); List results = new ArrayList<>(); scheduler.registerListener(() -> { results.add("timer-listener");