Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions core/sched.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ void cpu::reschedule_from_interrupt(bool called_from_yield,
}
thread* p = thread::current();

if (p->_realtime.has_slice()) {
p->_realtime._run_time += interval;
}

const auto p_status = p->_detached_state->st.load();
assert(p_status != thread::status::queued);

Expand All @@ -297,8 +301,18 @@ void cpu::reschedule_from_interrupt(bool called_from_yield,
} else if (!called_from_yield) {
auto &t = *runqueue.begin();
if (p->_realtime._priority > 0) {
// Only switch to a higher-priority realtime thread
if (t._realtime._priority <= p->_realtime._priority) {
// If the threads have the same realtime priority, then only reschedule
// if the currently executed thread has exceeded its time slice (if any).
if (t._realtime._priority == p->_realtime._priority &&
((!p->_realtime.has_slice() || p->_realtime.has_remaining()))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So all this means we are going to keep the current thread p running should stay running if _time_slice is 0 (run 'forever' until yields or waits) OR there is still time left to run per its _time_slice, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[…] should stay running if _time_slice is 0 (run 'forever' until yields or waits) OR there is still time left to run per its _time_slice, right?

Yes p->_realtime.has_slice() checks if it has a time slice (i.e. _time_slice != 0), p->_realtime.has_remaining() checks if there is still time remaining on the slice (if it has one).

Note that, even if the thread has exceeded its time slice it may still be selected to run again if there is no thread with a higher priority. Hence, the priority comparison in the if condition.

#ifdef __aarch64__
return switch_data;
#else
return;
#endif
// Otherwise, don't switch to a lower-priority realtime thread,
// no matter how much time slice was used by the running thread.
} else if (t._realtime._priority < p->_realtime._priority) {
#ifdef __aarch64__
return switch_data;
#else
Expand Down Expand Up @@ -338,15 +352,27 @@ void cpu::reschedule_from_interrupt(bool called_from_yield,
p->_detached_state->st.store(thread::status::queued);

if (!called_from_yield) {
// POSIX requires that if a real-time thread doesn't yield but
// rather is preempted by a higher-priority thread, it be
// reinserted into the runqueue first, not last, among its equals.
enqueue_first_equal(*p);
if (p->_realtime.has_slice() && !p->_realtime.has_remaining()) {
// The real-time thread has exceeded it's time slice, enqueue
// it at the end among its equals and reset the time slice.
enqueue(*p);
p->_realtime.reset_slice();
} else {
// POSIX requires that if a real-time thread doesn't yield but
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means that if the current thread p _time_slice is 0 OR p still has some remaining time to run, we will call enqueue_first_equal(). Is this correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's correct. If we got here it means p was preempted. If it still has remaining time, it means it was preempted by a higher-priority realtime thread but when that higher-priority thread doesn't want to run, this thread p should continue running and continue its current time slice. The documentation says: "A SCHED_RR thread that has been preempted by a higher priority thread and subsequently resumes execution as a running thread will complete the unexpired portion of its round-robin time quantum.". It should be the first one in its priority group to run (and therefore enqueue_first_equal()) just like when no time slices existed.

// rather is preempted by a higher-priority thread, it be
// reinserted into the runqueue first, not last, among its equals.
enqueue_first_equal(*p);
}
}

trace_sched_preempt();
p->stat_preemptions.incr();
} else {
// p is no longer running, if it has a realtime slice reset it.
if (p->_realtime.has_slice()) {
p->_realtime.reset_slice();
}
Comment on lines +371 to +374
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not entirely clear to me if the slice should be reset if the thread is no longer runnable (e.g. because of blocking I/O). POSIX does not explicitly describe when the slice should be reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm also not sure, but think this if is right. I think the idea of the time slice is to make sure that a single thread never in its priority group never runs more than 1ms (for example) without letting other threads in its group run. But if the thread blocks or yields voluntarily (I believe this if covers both cases, right?), then it gives some other thread a chance to run and it too has a chance to run for a whole time-slice, so it's only fair that this thread's time slice is reset to zero. I think.
I tried searching if anybody mentions this question, and couldn't find such a discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I am missing something, Linux only seems to reset the time slice once it expires not when the thread is no longer runnable. AFAIK This is the only place in the rt scheduler where it is reset: https://github.com/torvalds/linux/blob/v6.16/kernel/sched/rt.c#L2580-L2583


// p is no longer running, so we'll switch to a different thread.
// Return the runtime p borrowed for hysteresis.
p->_runtime.hysteresis_run_stop();
Expand Down Expand Up @@ -394,6 +420,9 @@ void cpu::reschedule_from_interrupt(bool called_from_yield,
} else {
preemption_timer.set_with_irq_disabled(now + preempt_after);
}
} else if (n->_realtime.has_slice()) {
assert(n->_realtime.has_remaining());
preemption_timer.set_with_irq_disabled(now + n->_realtime.remaining());
}

if (app_thread.load(std::memory_order_relaxed) != n->_app) { // don't write into a cache line if it can be avoided
Expand Down Expand Up @@ -922,10 +951,6 @@ unsigned thread::realtime_priority() const

void thread::set_realtime_time_slice(thread_realtime::duration time_slice)
{
if (time_slice > 0) {
WARN_ONCE("set_realtime_time_slice() used but real-time time slices"
" not yet supported");
}
_realtime._time_slice = time_slice;
}

Expand Down
27 changes: 27 additions & 0 deletions include/osv/sched.hh
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,31 @@ public:
using duration = thread_runtime::duration;
unsigned _priority = 0;
duration _time_slice = duration::zero();

// Total time this thread ran since starting this slice.
duration _run_time = duration::zero();

void reset_slice() {
_run_time = duration::zero();
}

bool has_slice() const {
// If the priority is 0 we don't schedule the thread according
// to SCHED_RR and therefore also don't respect the time slice.
if (_priority <= 0) {
return false;
}

return _time_slice != duration::zero();
}

bool has_remaining() const {
return _time_slice > _run_time;
}

duration remaining() const {
return _time_slice - _run_time;
}
};

// "tau" controls the length of the history we consider for scheduling,
Expand Down Expand Up @@ -694,6 +719,8 @@ public:
* With time_slice == 0, the real-time scheduling policy matches POSIX's
* "SCHED_FIFO" policy. With time_slice > 0, it matches POSIX's "SCHED_RR"
* policy.
*
* Note: The time_slice should be set before the thread is started.
*/
void set_realtime_time_slice(thread_realtime::duration time_slice);
/**
Expand Down
3 changes: 2 additions & 1 deletion modules/tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ tests := tst-pthread.so misc-ramdisk.so tst-vblk.so tst-bsd-evh.so \
tst-netlink.so misc-zfs-io.so misc-zfs-arc.so tst-pthread-create.so \
misc-futex-perf.so misc-syscall-perf.so tst-brk.so tst-reloc.so \
misc-vdso-perf.so tst-string-utils.so tst-elf-circular-reloc.so \
lib-circular-reloc1.so lib-circular-reloc2.so tst-rwlock.so
lib-circular-reloc1.so lib-circular-reloc2.so tst-rwlock.so \
tst-thread-realtime.so
# tst-f128.so \


Expand Down
161 changes: 161 additions & 0 deletions tests/tst-thread-realtime.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#include <atomic>
#include <string>
#include <iostream>

#include <osv/elf.hh>
#include <osv/sched.hh>

static std::string name = "tst-thr-wrk";
static int tests = 0, fails = 0;

static void report(bool ok, std::string msg)
{
++tests;
fails += !ok;
std::cout << (ok ? "PASS" : "FAIL") << ": " << msg << "\n";
}

static int fac(int n)
{
if (n == 0) {
return 0;
} else if (n == 1) {
return 1;
} else {
return fac(n - 1) + fac(n - 2);
}
}

static bool preempts_equalized()
{
constexpr int slice = 250;
constexpr int num_threads = 5;
constexpr int preempts = 20;
std::atomic<bool> stop_threads(false);
sched::thread *threads[num_threads];

sched::cpu *c = sched::cpu::current();
for (int i = 0; i < num_threads; i++) {
threads[i] = sched::thread::make([&]{
while (!stop_threads.load()) {
fac(10);
}
}, sched::thread::attr().name(name));

threads[i]->pin(c);
threads[i]->set_realtime_priority(1);
threads[i]->set_realtime_time_slice(std::chrono::milliseconds(slice));
threads[i]->start();
}

auto runtime = std::chrono::milliseconds((slice * num_threads) * preempts);
sched::thread::sleep(runtime);
stop_threads = true;

std::vector<long> num_preempts(num_threads);
for (int i = 0; i < num_threads; i++) {
num_preempts[i] = threads[i]->stat_preemptions.get();
}

bool ok = true;

// Fuzzy comparison with `num_preempts` which allows for some
// divergence from the expected amount of preempts to account
// for scheduler overhead.
constexpr int slack = preempts * 0.10; // 10% slack
for (auto n : num_preempts) {
if (n != preempts && !(n == preempts-slack || n == preempts+slack)) {
ok = false;
break;
}
}

for (int i = 0; i < num_threads; i++) {
delete threads[i];
}

return ok;
}

static bool priority_precedence()
{
constexpr int slice = 100000000;

std::atomic<bool> high_prio_stop(false);
std::atomic<bool> high_prio_yielded(false);
std::atomic<bool> low_prio_enqueued(false);

sched::thread *main_thread = sched::thread::current();
sched::thread *high_prio = sched::thread::make([&]{
while (!high_prio_stop.load()) {
fac(10);

if (low_prio_enqueued && !high_prio_yielded) {
sched::thread::yield();
main_thread->wake_with([&] { high_prio_yielded.store(true); });
}
}
});

static std::atomic<bool> low_prio_stop;
sched::thread *low_prio = sched::thread::make([&]{
while (!low_prio_stop.load()) {
fac(10);
}
});

sched::cpu *c = sched::cpu::current();
high_prio->pin(c);
low_prio->pin(c);

// If these are commented, the test will realiably fail.
high_prio->set_realtime_priority(2);
low_prio->set_realtime_priority(1);

// The higher priority thread has a time slice, but since there is no other thread
// with the same priority, it should monopolize the CPU and lower_prio shouldn't run.
high_prio->set_realtime_time_slice(sched::thread_realtime::duration(slice));

high_prio->start();
low_prio->start();

// Ensure that the high priority thread yields once while the low priority thread
// is also in the runqueue. Without real-time scheduling the low priority thread
// should then get the CPU.
while (low_prio->get_status() != sched::thread::status::queued) {
// busy wait until low_prio is enqueued.
}
low_prio_enqueued.store(true);
sched::thread::wait_until([&] { return high_prio_yielded.load(); });

auto runtime = std::chrono::nanoseconds(slice * 3);
sched::thread::sleep(runtime);

// Since both threads are pinned to the CPU and the higher priority
// thread is always runnable, the lower priority thread should starve.
bool ok = high_prio->thread_clock().count() > 0 &&
low_prio->thread_clock().count() == 0;

low_prio_stop = true;
high_prio_stop = true;

delete high_prio;
delete low_prio;

return ok;
}

int main(int ac, char** av)
{
// Ensure that the main thread doesn't starve.
sched::thread *cur = sched::thread::current();
cur->set_realtime_priority(10);

report(priority_precedence(), "priority_precedence");
report(preempts_equalized(), "preempts_equalized");
std::cout << "SUMMARY: " << tests << " tests, " << fails << " failures\n";
}

// Because we're using wake_with() and wait_until(), this executable must not
// cause sleepable lazy symbol resolutions or on-demand paging:
OSV_ELF_MLOCK_OBJECT();