Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions src/main/java/io/streamnative/kop/utils/timer/SystemTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop.utils.timer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry;
import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.utils.Time;

/**
* A system timer implementation.
*/
@Slf4j
@ThreadSafe
public class SystemTimer implements Timer {

/**
* Create a system timer builder.
*
* @return a system timer builder.
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder to build a system timer.
*/
public static class Builder {

private String executorName;
private long tickMs = 1;
private int wheelSize = 20;
private long startMs = Time.SYSTEM.hiResClockMs();

private Builder() {}

public Builder executorName(String executorName) {
this.executorName = executorName;
return this;
}

public Builder tickMs(long tickMs) {
this.tickMs = tickMs;
return this;
}

public Builder wheelSize(int wheelSize) {
this.wheelSize = wheelSize;
return this;
}

public Builder startMs(long startMs) {
this.startMs = startMs;
return this;
}

public SystemTimer build() {
Objects.requireNonNull(executorName, "No executor name is provided");

return new SystemTimer(
executorName,
tickMs,
wheelSize,
startMs
);
}

}

private final ExecutorService taskExecutor;
private final DelayQueue<TimerTaskList> delayQueue;
private final AtomicInteger taskCounter;
private final TimingWheel timingWheel;

// Locks used to protect data structures while ticking
private final ReentrantReadWriteLock readWriteLock;
private final Lock readLock;
private final Lock writeLock;
private final Consumer<TimerTaskEntry> reinsert;

private SystemTimer(String executorName,
long tickMs,
int wheelSize,
long startMs) {
this.taskExecutor = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("system-timer-%d")
.build()
);
this.delayQueue = new DelayQueue();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
this.readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.reinsert = timerTaskEntry -> addTimerTaskEntry(timerTaskEntry);
}

@Override
public void add(TimerTask timerTask) {
readLock.lock();
try {
addTimerTaskEntry(new TimerTaskEntry(
timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()
));
} finally {
readLock.unlock();
}
}

private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled()) {
taskExecutor.submit(timerTaskEntry.timerTask());
}
}
}

@SneakyThrows
@Override
public boolean advanceClock(long timeoutMs) {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (null != bucket) {
writeLock.lock();
try {
while (null != bucket) {
timingWheel.advanceClock(bucket.getExpiration());
bucket.flush(reinsert);
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}

@Override
public int size() {
return taskCounter.get();
}

@Override
public void shutdown() {
taskExecutor.shutdown();
}

}
51 changes: 51 additions & 0 deletions src/main/java/io/streamnative/kop/utils/timer/Timer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop.utils.timer;

/**
* The timer interface to execute delayed operations.
*/
public interface Timer {

/**
* Add a new task to this executor. It will be executed after the task's delay
* (beginning from the time of submission)
*
* @param timerTask the task to add
*/
void add(TimerTask timerTask);

/**
* Advance the internal clock, executing any tasks whose expiration has been
* reached within the duration of the passed timeout.
*
* @param timeoutMs
* @return whether or not any tasks were executed
*/
boolean advanceClock(long timeoutMs);

/**
* Get the number of tasks pending execution.
*
* @return the number of tasks
*/
int size();

/**
* Shutdown the timer service, leaving pending tasks unexecuted.
*/
void shutdown();


}
50 changes: 50 additions & 0 deletions src/main/java/io/streamnative/kop/utils/timer/TimerTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop.utils.timer;

import io.streamnative.kop.utils.timer.TimerTaskList.TimerTaskEntry;

/**
* Timer task.
*/
public abstract class TimerTask implements Runnable {

protected final long delayMs;
private TimerTaskEntry timerTaskEntry = null;

protected TimerTask(long delayMs) {
this.delayMs = delayMs;
}

public synchronized void cancel() {
if (null != timerTaskEntry) {
timerTaskEntry.remove();
timerTaskEntry = null;
}
}

synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.
if (null != timerTaskEntry && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}

synchronized TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}

}
Loading