diff --git a/src/main/java/io/streamnative/kop/utils/timer/SystemTimer.java b/src/main/java/io/streamnative/kop/utils/timer/SystemTimer.java new file mode 100644 index 0000000000..9056db31fc --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/SystemTimer.java @@ -0,0 +1,181 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry; +import java.util.Objects; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import javax.annotation.concurrent.ThreadSafe; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Time; + +/** + * A system timer implementation. + */ +@Slf4j +@ThreadSafe +public class SystemTimer implements Timer { + + /** + * Create a system timer builder. + * + * @return a system timer builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder to build a system timer. + */ + public static class Builder { + + private String executorName; + private long tickMs = 1; + private int wheelSize = 20; + private long startMs = Time.SYSTEM.hiResClockMs(); + + private Builder() {} + + public Builder executorName(String executorName) { + this.executorName = executorName; + return this; + } + + public Builder tickMs(long tickMs) { + this.tickMs = tickMs; + return this; + } + + public Builder wheelSize(int wheelSize) { + this.wheelSize = wheelSize; + return this; + } + + public Builder startMs(long startMs) { + this.startMs = startMs; + return this; + } + + public SystemTimer build() { + Objects.requireNonNull(executorName, "No executor name is provided"); + + return new SystemTimer( + executorName, + tickMs, + wheelSize, + startMs + ); + } + + } + + private final ExecutorService taskExecutor; + private final DelayQueue delayQueue; + private final AtomicInteger taskCounter; + private final TimingWheel timingWheel; + + // Locks used to protect data structures while ticking + private final ReentrantReadWriteLock readWriteLock; + private final Lock readLock; + private final Lock writeLock; + private final Consumer reinsert; + + private SystemTimer(String executorName, + long tickMs, + int wheelSize, + long startMs) { + this.taskExecutor = Executors.newFixedThreadPool( + 1, new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat("system-timer-%d") + .build() + ); + this.delayQueue = new DelayQueue(); + this.taskCounter = new AtomicInteger(0); + this.timingWheel = new TimingWheel( + tickMs, + wheelSize, + startMs, + taskCounter, + delayQueue + ); + this.readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + this.reinsert = timerTaskEntry -> addTimerTaskEntry(timerTaskEntry); + } + + @Override + public void add(TimerTask timerTask) { + readLock.lock(); + try { + addTimerTaskEntry(new TimerTaskEntry( + timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs() + )); + } finally { + readLock.unlock(); + } + } + + private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) { + if (!timingWheel.add(timerTaskEntry)) { + // Already expired or cancelled + if (!timerTaskEntry.cancelled()) { + taskExecutor.submit(timerTaskEntry.timerTask()); + } + } + } + + @SneakyThrows + @Override + public boolean advanceClock(long timeoutMs) { + TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS); + if (null != bucket) { + writeLock.lock(); + try { + while (null != bucket) { + timingWheel.advanceClock(bucket.getExpiration()); + bucket.flush(reinsert); + bucket = delayQueue.poll(); + } + } finally { + writeLock.unlock(); + } + return true; + } else { + return false; + } + } + + @Override + public int size() { + return taskCounter.get(); + } + + @Override + public void shutdown() { + taskExecutor.shutdown(); + } + +} diff --git a/src/main/java/io/streamnative/kop/utils/timer/Timer.java b/src/main/java/io/streamnative/kop/utils/timer/Timer.java new file mode 100644 index 0000000000..8f17321449 --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/Timer.java @@ -0,0 +1,51 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +/** + * The timer interface to execute delayed operations. + */ +public interface Timer { + + /** + * Add a new task to this executor. It will be executed after the task's delay + * (beginning from the time of submission) + * + * @param timerTask the task to add + */ + void add(TimerTask timerTask); + + /** + * Advance the internal clock, executing any tasks whose expiration has been + * reached within the duration of the passed timeout. + * + * @param timeoutMs + * @return whether or not any tasks were executed + */ + boolean advanceClock(long timeoutMs); + + /** + * Get the number of tasks pending execution. + * + * @return the number of tasks + */ + int size(); + + /** + * Shutdown the timer service, leaving pending tasks unexecuted. + */ + void shutdown(); + + +} diff --git a/src/main/java/io/streamnative/kop/utils/timer/TimerTask.java b/src/main/java/io/streamnative/kop/utils/timer/TimerTask.java new file mode 100644 index 0000000000..0adbc766be --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/TimerTask.java @@ -0,0 +1,50 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry; + +/** + * Timer task. + */ +public abstract class TimerTask implements Runnable { + + protected final long delayMs; + private TimerTaskEntry timerTaskEntry = null; + + protected TimerTask(long delayMs) { + this.delayMs = delayMs; + } + + public synchronized void cancel() { + if (null != timerTaskEntry) { + timerTaskEntry.remove(); + timerTaskEntry = null; + } + } + + synchronized void setTimerTaskEntry(TimerTaskEntry entry) { + // if this timerTask is already held by an existing timer task entry, + // we will remove such an entry first. + if (null != timerTaskEntry && timerTaskEntry != entry) { + timerTaskEntry.remove(); + } + timerTaskEntry = entry; + } + + synchronized TimerTaskEntry getTimerTaskEntry() { + return timerTaskEntry; + } + +} diff --git a/src/main/java/io/streamnative/kop/utils/timer/TimerTaskList.java b/src/main/java/io/streamnative/kop/utils/timer/TimerTaskList.java new file mode 100644 index 0000000000..1f6451883d --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/TimerTaskList.java @@ -0,0 +1,209 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import javax.annotation.concurrent.ThreadSafe; +import lombok.Getter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Time; + +/** + * The timer task list is a java implementation of Kafka implementation. + */ +@SuppressFBWarnings({ + "EQ_COMPARETO_USE_OBJECT_EQUALS", + "HE_EQUALS_USE_HASHCODE" +}) +@Slf4j +@ThreadSafe +public class TimerTaskList implements Delayed { + + private final AtomicInteger taskCounter; + private final AtomicLong expiration; + + // TimerTaskList forms a doubly linked cyclic list using a dummy root entry + // root.next points to the head + // root.prev points to the tail + private final TimerTaskEntry root; + + public TimerTaskList(AtomicInteger taskCounter) { + this.taskCounter = taskCounter; + this.root = new TimerTaskEntry(null, -1); + this.root.next = root; + this.root.prev = root; + this.expiration = new AtomicLong(-1L); + } + + // Set the bucket's expiration time + // Returns true if the expiration time is changed + public boolean setExpiration(long expirationMs) { + return expiration.getAndSet(expirationMs) != expirationMs; + } + + // Get the bucket's expiration time + public long getExpiration() { + return expiration.get(); + } + + public synchronized void forEach(Consumer f) { + TimerTaskEntry entry = root.next; + while (entry != root) { + final TimerTaskEntry nextEntry = entry.next; + if (!entry.cancelled()) { + f.accept(entry.timerTask); + } + entry = nextEntry; + } + } + + // add a timer task entry to this list + public void add(TimerTaskEntry timerTaskEntry) { + boolean done = false; + while (!done) { + // Remove the timer task entry if it is already in any other list + // We do this outside of the sync block below to avoid deadlocking. + // We may retry until timerTaskEntry.list becomes null. + timerTaskEntry.remove(); + + synchronized (this) { + synchronized (timerTaskEntry) { + if (timerTaskEntry.list == null) { + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + TimerTaskEntry tail = root.prev; + timerTaskEntry.next = root; + timerTaskEntry.prev = tail; + timerTaskEntry.list = this; + tail.next = timerTaskEntry; + root.prev = timerTaskEntry; + taskCounter.incrementAndGet(); + done = true; + } + } + } + } + } + + // Remove the specified timer task entry from this list + public void remove(TimerTaskEntry timerTaskEntry) { + synchronized (this) { + synchronized (timerTaskEntry) { + if (timerTaskEntry.list == this) { + timerTaskEntry.next.prev = timerTaskEntry.prev; + timerTaskEntry.prev.next = timerTaskEntry.next; + timerTaskEntry.next = null; + timerTaskEntry.prev = null; + timerTaskEntry.list = null; + taskCounter.decrementAndGet(); + } + } + } + } + + // Remove all task entries and apply the supplied function to each of them + public synchronized void flush(Consumer f) { + TimerTaskEntry head = root.next; + while (head != root) { + remove(head); + f.accept(head); + head = root.next; + } + expiration.set(-1L); + } + + public long getDelay(TimeUnit unit) { + return unit.convert(Math.max(getExpiration() - Time.SYSTEM.hiResClockMs(), 0), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + TimerTaskList other = (TimerTaskList) o; + + if (getExpiration() < other.getExpiration()) { + return -1; + } else if (getExpiration() > other.getExpiration()) { + return 1; + } else { + return 0; + } + } + + /** + * A timer task entry in the timer task list. + */ + @Accessors(fluent = true) + protected static class TimerTaskEntry implements Comparable { + + @Getter + private final TimerTask timerTask; + @Getter + private final long expirationMs; + private volatile TimerTaskList list = null; + private TimerTaskEntry next = null; + private TimerTaskEntry prev = null; + + public TimerTaskEntry(TimerTask timerTask, + long expirationMs) { + this.timerTask = timerTask; + this.expirationMs = expirationMs; + // if this timerTask is already held by an existing timer task entry, + // setTimerTaskEntry will remove it. + if (null != timerTask) { + timerTask.setTimerTaskEntry(this); + } + } + + public boolean cancelled() { + return timerTask.getTimerTaskEntry() != this; + } + + public void remove() { + TimerTaskList currentList = list; + // If remove is called when another thread is moving the entry from a task entry list to another, + // this may fail to remove the entry due to the change of value of list. Thus, we retry until the + // list becomes null. In a rare case, this thread sees null and exits the loop, but the other thread + // insert the entry to another list later. + while (currentList != null) { + currentList.remove(this); + currentList = list; + } + } + + @Override + public int compareTo(TimerTaskEntry o) { + return Long.compare(this.expirationMs, o.expirationMs); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TimerTaskEntry)) { + return false; + } + TimerTaskEntry other = (TimerTaskEntry) obj; + return compareTo(other) == 0 + && list == other.list + && next == other.next + && prev == other.prev + && timerTask == other.timerTask; + } + } + + +} diff --git a/src/main/java/io/streamnative/kop/utils/timer/TimingWheel.java b/src/main/java/io/streamnative/kop/utils/timer/TimingWheel.java new file mode 100644 index 0000000000..051d66544b --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/TimingWheel.java @@ -0,0 +1,206 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry; +import java.util.List; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Hierarchical Timing Wheels + * + *

A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. + * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. + * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, + * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …, + * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and + * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task + * into the bucket for the current time since it is already expired. The timer immediately runs + * the expired task. The emptied bucket is then available for the next round, so if the current + * bucket is for the time t, it becomes the bucket for [t + u * n, t + (n + 1) * u) after a tick. + * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue + * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) + * insert/delete cost. + * + *

A major drawback of a simple timing wheel is that it assumes that a timer request is within + * the time interval of n * u from the current time. If a timer request is out of this interval, + * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically + * organized timing wheels. The lowest level has the finest time resolution. As moving up the + * hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is u and + * the size is n, the resolution of the next level should be n * u. At each level overflows are + * delegated to the wheel in one level higher. When the wheel in the higher level ticks, it reinsert + * timer tasks to the lower level. An overflow wheel can be created on-demand. When a bucket in an + * overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks + * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) + * where m is the number of wheels, which is usually very small compared to the number of requests + * in the system, and the delete (stop-timer) cost is still O(1). + * + *

Example + * Let's say that u is 1 and n is 3. If the start time is c, + * then the buckets at different levels are: + * + *

+ * level buckets + * 1 [c,c] [c+1,c+1] [c+2,c+2] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+26] + *

+ * + *

The bucket expiration is at the time of bucket beginning. + * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired. + * Level 1's clock moves to c+1, and [c+3,c+3] is created. + * Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively. + * So, no new buckets are created in level 2 and 3. + * + *

Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1. + * The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2. + * This is a bit wasteful, but simplifies the implementation. + * + *

+ * 1 [c+1,c+1] [c+2,c+2] [c+3,c+3] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+26] + *

+ * + *

At time = c+2, [c+1,c+1] is newly expired. + * Level 1 moves to c+2, and [c+4,c+4] is created, + * + *

+ * 1 [c+2,c+2] [c+3,c+3] [c+4,c+4] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+18] + *

+ * + *

+ * At time = c+3, [c+2,c+2] is newly expired. + * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created. + * Level 3 stay at c. + *

+ * + *

+ * 1 [c+3,c+3] [c+4,c+4] [c+5,c+5] + * 2 [c+3,c+5] [c+6,c+8] [c+9,c+11] + * 3 [c,c+8] [c+9,c+17] [c+8,c+11] + *

+ * + *

The hierarchical timing wheels works especially well when operations are completed before they time out. + * Even when everything times out, it still has advantageous when there are many items in the timer. + * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority + * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. + * + *

This class is not thread-safe. There should not be any add calls while advanceClock is executing. + * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. + * + *

Note: this is the implementation from Kafka. + */ +class TimingWheel { + + private final long tickMs; + private final int wheelSize; + private final long startMs; + private final AtomicInteger taskCounter; + private final DelayQueue queue; + + private final long interval; + private final List buckets; + private long currentTime; + + // overflowWheel can potentially be updated and read by two concurrent threads through add(). + // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM + private volatile TimingWheel overflowWheel = null; + + public TimingWheel( + long tickMs, + int wheelSize, + long startMs, + AtomicInteger taskCounter, + DelayQueue queue + ) { + this.tickMs = tickMs; + this.wheelSize = wheelSize; + this.startMs = startMs; + this.taskCounter = taskCounter; + this.queue = queue; + + this.interval = tickMs * wheelSize; + this.buckets = IntStream.range(0, wheelSize) + .mapToObj(i -> new TimerTaskList(taskCounter)) + .collect(Collectors.toList()); + this.currentTime = startMs - (startMs % tickMs); // rounding down to multiple of tickMs + } + + private synchronized void addOverflowWheel() { + if (null == overflowWheel) { + overflowWheel = new TimingWheel( + interval, + wheelSize, + currentTime, + taskCounter, + queue + ); + } + } + + public boolean add(TimerTaskEntry timerTaskEntry) { + final long expiration = timerTaskEntry.expirationMs(); + + if (timerTaskEntry.cancelled()) { + // cancelled + return false; + } else if (expiration < currentTime + tickMs) { + // Already expired + return false; + } else if (expiration < currentTime + interval) { + // Put in its own bucket + final long virtualId = expiration / tickMs; + TimerTaskList bucket = buckets.get( + (int) (virtualId % (long) wheelSize) + ); + bucket.add(timerTaskEntry); + + // Set the bucket expiration time + if (bucket.setExpiration(virtualId * tickMs)) { + // The bucket needs to be enqueued because it was an expired bucket + // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced + // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle + // will pass in the same value and hence return false, thus the bucket with the same expiration will not + // be enqueued multiple times. + queue.offer(bucket); + } + return true; + } else { + // Out of the interval. Put it into the parent timer + if (null == overflowWheel) { + addOverflowWheel(); + } + return overflowWheel.add(timerTaskEntry); + } + } + + // Try to advance the clock + public void advanceClock(long timeMs) { + if (timeMs >= currentTime + tickMs) { + currentTime = timeMs - (timeMs % tickMs); + + // Try to advance the clock of the overflow wheel if present + if (null != overflowWheel) { + overflowWheel.advanceClock(currentTime); + } + } + } + +} diff --git a/src/main/java/io/streamnative/kop/utils/timer/package-info.java b/src/main/java/io/streamnative/kop/utils/timer/package-info.java new file mode 100644 index 0000000000..ae50afe3b5 --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/timer/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Timer related classes. + * + *

The classes under this package are ported from Kafka. + */ +package io.streamnative.kop.utils.timer; diff --git a/src/test/java/io/streamnative/kop/utils/MockTime.java b/src/test/java/io/streamnative/kop/utils/MockTime.java new file mode 100644 index 0000000000..ffc3086a9a --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/MockTime.java @@ -0,0 +1,111 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.common.utils.Time; + +/** + * A clock that you can manually advance by calling sleep. + */ +public class MockTime implements Time { + + /** + * Mock time listener. + */ + interface MockTimeListener { + void tick(); + } + + /** + * Listeners which are waiting for time changes. + */ + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + + private final long autoTickMs; + + // Values from `nanoTime` and `currentTimeMillis` are not comparable, so we store them separately to allow tests + // using this class to detect bugs where this is incorrectly assumed to be true + private final AtomicLong timeMs; + private final AtomicLong highResTimeNs; + + public MockTime() { + this(0); + } + + public MockTime(long autoTickMs) { + this(autoTickMs, System.currentTimeMillis(), System.nanoTime()); + } + + public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs) { + this.timeMs = new AtomicLong(currentTimeMs); + this.highResTimeNs = new AtomicLong(currentHighResTimeNs); + this.autoTickMs = autoTickMs; + } + + public void addListener(MockTimeListener listener) { + listeners.add(listener); + } + + @Override + public long milliseconds() { + maybeSleep(autoTickMs); + return timeMs.get(); + } + + @Override + public long nanoseconds() { + maybeSleep(autoTickMs); + return highResTimeNs.get(); + } + + @Override + public long hiResClockMs() { + return TimeUnit.NANOSECONDS.toMillis(nanoseconds()); + } + + private void maybeSleep(long ms) { + if (ms != 0) { + sleep(ms); + } + } + + @Override + public void sleep(long ms) { + timeMs.addAndGet(ms); + highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms)); + tick(); + } + + public void setCurrentTimeMs(long newMs) { + long oldMs = timeMs.getAndSet(newMs); + + // does not allow to set to an older timestamp + if (oldMs > newMs) { + throw new IllegalArgumentException("Setting the time to " + newMs + " while current time " + + oldMs + " is newer; this is not allowed"); + } + + highResTimeNs.set(TimeUnit.MILLISECONDS.toNanos(newMs)); + tick(); + } + + private void tick() { + for (MockTimeListener listener : listeners) { + listener.tick(); + } + } +} diff --git a/src/test/java/io/streamnative/kop/utils/timer/MockTimer.java b/src/test/java/io/streamnative/kop/utils/timer/MockTimer.java new file mode 100644 index 0000000000..27d6b2c0f5 --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/timer/MockTimer.java @@ -0,0 +1,85 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import io.streamnative.kop.utils.MockTime; +import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry; +import java.util.Comparator; +import java.util.PriorityQueue; + +/** + * A mock implementation of {@link Timer}. + */ +public class MockTimer implements Timer { + + private final MockTime time = new MockTime(); + private final PriorityQueue taskQueue = new PriorityQueue<>(Comparator.reverseOrder()); + + @Override + public void add(TimerTask timerTask) { + if (timerTask.delayMs <= 0) { + timerTask.run(); + } else { + synchronized (taskQueue) { + taskQueue.add( + new TimerTaskEntry( + timerTask, timerTask.delayMs + time.milliseconds())); + } + } + } + + @Override + public boolean advanceClock(long timeoutMs) { + time.sleep(timeoutMs); + + boolean executed = false; + final long now = time.milliseconds(); + boolean hasMore = true; + + while (hasMore) { + hasMore = false; + TimerTaskEntry head; + synchronized (taskQueue) { + head = taskQueue.peek(); + if (null != head && now > head.expirationMs()) { + head = taskQueue.poll(); + hasMore = !taskQueue.isEmpty(); + } else { + head = null; + } + } + if (null != head) { + if (!head.cancelled()) { + TimerTask task = head.timerTask(); + task.run(); + executed = true; + } + } + } + + return executed; + } + + @Override + public int size() { + synchronized (taskQueue) { + return taskQueue.size(); + } + } + + @Override + public void shutdown() { + // no-op + } +} diff --git a/src/test/java/io/streamnative/kop/utils/timer/TimerTaskListTest.java b/src/test/java/io/streamnative/kop/utils/timer/TimerTaskListTest.java new file mode 100644 index 0000000000..89a512ad17 --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/timer/TimerTaskListTest.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import static org.junit.Assert.assertEquals; + +import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; + +/** + * Unit test {@link TimerTaskList}. + */ +public class TimerTaskListTest { + + /** + * Test task. + */ + private static class TestTask extends TimerTask { + + protected TestTask(long delayMs) { + super(delayMs); + } + + @Override + public void run() { + + } + + } + + private int size(TimerTaskList list) { + AtomicInteger count = new AtomicInteger(0); + list.forEach(ignored -> count.incrementAndGet()); + return count.get(); + } + + @Test + public void testAll() { + AtomicInteger sharedCounter = new AtomicInteger(0); + TimerTaskList list1 = new TimerTaskList(sharedCounter); + TimerTaskList list2 = new TimerTaskList(sharedCounter); + TimerTaskList list3 = new TimerTaskList(sharedCounter); + + List tasks = IntStream.rangeClosed(1, 10).mapToObj(i -> { + TestTask task = new TestTask(0L); + list1.add(new TimerTaskEntry(task, 10L)); + assertEquals(i, sharedCounter.get()); + return task; + }).collect(Collectors.toList()); + + assertEquals(tasks.size(), sharedCounter.get()); + + // reinserting the existing tasks shouldn't change the task count. + tasks.subList(0, 4).forEach(task -> { + int prevCount = sharedCounter.get(); + // new TimerTaskEntry(task) will remove the existing entry from the list + list2.add(new TimerTaskEntry(task, 10L)); + assertEquals(prevCount, sharedCounter.get()); + }); + assertEquals(10 - 4, size(list1)); + assertEquals(4, size(list2)); + assertEquals(tasks.size(), sharedCounter.get()); + + // reinserting the existing tasks shouldn't change the task count + tasks.subList(4, 10).forEach(task -> { + int prevCount = sharedCounter.get(); + // new TimerTaskEntry(task) will remove the existing entry from the list + list3.add(new TimerTaskEntry(task, 10L)); + assertEquals(prevCount, sharedCounter.get()); + }); + assertEquals(0, size(list1)); + assertEquals(4, size(list2)); + assertEquals(6, size(list3)); + assertEquals(tasks.size(), sharedCounter.get()); + + // cancel tasks in the lists + list1.forEach(TimerTask::cancel); + assertEquals(0, size(list1)); + assertEquals(4, size(list2)); + assertEquals(6, size(list3)); + + list2.forEach(TimerTask::cancel); + assertEquals(0, size(list1)); + assertEquals(0, size(list2)); + assertEquals(6, size(list3)); + + list3.forEach(TimerTask::cancel); + assertEquals(0, size(list1)); + assertEquals(0, size(list2)); + assertEquals(0, size(list3)); + } + +} diff --git a/src/test/java/io/streamnative/kop/utils/timer/TimerTest.java b/src/test/java/io/streamnative/kop/utils/timer/TimerTest.java new file mode 100644 index 0000000000..5bbe356d11 --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/timer/TimerTest.java @@ -0,0 +1,161 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils.timer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test {@link Timer}. + */ +@Slf4j +public class TimerTest { + + private static class TestTask extends TimerTask { + private final int id; + private final CountDownLatch latch; + private final List output; + private final AtomicBoolean completed = new AtomicBoolean(false); + + public TestTask(long delayMs, + int id, + CountDownLatch latch, + List output) { + super(delayMs); + this.id = id; + this.latch = latch; + this.output = output; + } + + public void run() { + if (completed.compareAndSet(false, true)) { + synchronized (output) { + output.add(id); + } + latch.countDown(); + } + } + } + + private Timer timer = null; + + @Before + public void setup() { + this.timer = SystemTimer.builder() + .executorName("test") + .tickMs(1) + .wheelSize(3) + .startMs(Time.SYSTEM.hiResClockMs()) + .build(); + } + + @After + public void teardown() { + timer.shutdown(); + } + + @Test + public void testAlreadyExpiredTask() { + final List output = new ArrayList<>(); + final List latches = IntStream.range(-5, 0).mapToObj(i -> { + CountDownLatch latch = new CountDownLatch(1); + timer.add(new TestTask(i, i, latch, output)); + return latch; + }).collect(Collectors.toList()); + + timer.advanceClock(0); + + latches.forEach(latch -> { + try { + assertEquals( + "already expired tasks should run immediately", + true, + latch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Should not reach here"); + } + }); + + assertEquals( + "Output of already expired tasks", + Lists.newArrayList(-5, -4, -3, -2, -1), + output + ); + } + + @Test + public void testTaskExpiration() { + final List output = new ArrayList<>(); + final List tasks = new ArrayList<>(); + final List ids = new ArrayList<>(); + final List latches = IntStream.range(0, 5).mapToObj(i -> { + CountDownLatch latch = new CountDownLatch(1); + tasks.add(new TestTask(i, i, latch, output)); + ids.add(i); + return latch; + }).collect(Collectors.toList()); + latches.addAll(IntStream.range(10, 100).mapToObj(i -> { + CountDownLatch latch = new CountDownLatch(1); + tasks.add(new TestTask(i, i, latch, output)); + tasks.add(new TestTask(i, i, latch, output)); + ids.add(i); + ids.add(i); + return latch; + }).collect(Collectors.toList())); + latches.addAll(IntStream.range(100, 500).mapToObj(i -> { + CountDownLatch latch = new CountDownLatch(1); + tasks.add(new TestTask(i, i, latch, output)); + ids.add(i); + return latch; + }).collect(Collectors.toList())); + + // randomly submit requests + tasks.forEach(task -> timer.add(task)); + + while (timer.advanceClock(2000)) {} + + latches.forEach(latch -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Should not reach here"); + } + }); + + Collections.sort(ids); + assertEquals( + "output should match", + ids, + output + ); + } + +}