From ed1bbe3e1b6ebd9e5afbcc66aa0ebcd1cf7f18f0 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Sat, 23 Jul 2011 14:01:43 -0700 Subject: [PATCH 01/11] Made root_task no longer special. --- src/rt/rust.cpp | 7 ++++--- src/rt/rust_kernel.cpp | 5 +++++ src/rt/rust_kernel.h | 2 ++ src/rt/rust_scheduler.cpp | 3 --- src/rt/rust_scheduler.h | 2 -- src/rt/rust_task.cpp | 10 ++++------ src/rt/rust_task.h | 3 --- src/rt/test/rust_test_runtime.cpp | 6 +++--- 8 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 029532363c8ef..06097e3419776 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -144,10 +144,11 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { rust_srv *srv = new rust_srv(); rust_kernel *kernel = new rust_kernel(srv); kernel->start(); - rust_scheduler *sched = kernel->get_scheduler(); + rust_task *root_task = kernel->create_task(NULL, "main"); + rust_scheduler *sched = root_task->sched; command_line_args *args = new (kernel, "main command line args") - command_line_args(sched->root_task, argc, argv); + command_line_args(root_task, argc, argv); DLOG(sched, dom, "startup: %d args in 0x%" PRIxPTR, args->argc, (uintptr_t)args->args); @@ -155,7 +156,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { DLOG(sched, dom, "startup: arg[%d] = '%s'", i, args->argv[i]); } - sched->root_task->start(main_fn, (uintptr_t)args->args); + root_task->start(main_fn, (uintptr_t)args->args); int num_threads = get_num_threads(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index d15d52ba431cd..53c2d945b09c8 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -261,6 +261,11 @@ int rust_kernel::start_task_threads(int num_threads) return sched->rval; } +rust_task * +rust_kernel::create_task(rust_task *spawner, const char *name) { + return sched->create_task(spawner, name); +} + #ifdef __WIN32__ void rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 6edb0f38dd5f9..07f4ff2f78765 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -121,6 +121,8 @@ class rust_kernel : public rust_thread { #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); #endif + + rust_task *create_task(rust_task *spawner, const char *name); }; class rust_task_thread : public rust_thread { diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 18ac66beb5ee3..09a78cebddbca 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -16,8 +16,6 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel, blocked_tasks(this, "blocked"), dead_tasks(this, "dead"), cache(this), - root_task(NULL), - curr_task(NULL), rval(0), kernel(kernel), message_queue(message_queue) @@ -29,7 +27,6 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel, pthread_attr_setstacksize(&attr, 1024 * 1024); pthread_attr_setdetachstate(&attr, true); #endif - root_task = create_task(NULL, name); } rust_scheduler::~rust_scheduler() { diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index b1b9d51db5682..cabcdf210a89f 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -46,8 +46,6 @@ struct rust_scheduler : public kernel_owned, rust_crate_cache cache; randctx rctx; - rust_task *root_task; - rust_task *curr_task; int rval; rust_kernel *kernel; diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index af55208582e69..de6b00acb3fd2 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -103,8 +103,8 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - I(sched, ref_count == 0 || - (ref_count == 1 && this == sched->root_task)); + // I(sched, ref_count == 0 || + // (ref_count == 1 && this == sched->root_task)); del_stk(this, stk); } @@ -207,8 +207,8 @@ rust_task::kill() { // Unblock the task so it can unwind. unblock(); - if (this == sched->root_task) - sched->fail(); + // if (this == sched->root_task) + // sched->fail(); LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); @@ -229,8 +229,6 @@ rust_task::fail() { supervisor->kill(); } // FIXME: implement unwinding again. - if (this == sched->root_task) - sched->fail(); failed = true; } diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 13b5537d5d9bb..b1984b9d40b82 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -123,9 +123,6 @@ rust_task : public maybe_proxy, void die(); void unblock(); - void check_active() { I(sched, sched->curr_task == this); } - void check_suspended() { I(sched, sched->curr_task != this); } - // Print a backtrace, if the "bt" logging option is on. void backtrace(); diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index 8acfe45c9e651..f9a99d9acb1ea 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -45,9 +45,9 @@ void task_entry() { void rust_task_test::worker::run() { - rust_scheduler *scheduler = kernel->get_scheduler(); - scheduler->root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - scheduler->start_main_loop(0); + rust_task *root_task = kernel->create_task(NULL, "main"); + root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); + root_task->sched->start_main_loop(0); } bool From 816d10a1c01e52b1378adcbd172532bbfb0d1a54 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Sat, 23 Jul 2011 19:03:02 -0700 Subject: [PATCH 02/11] Per-thread scheduling. Closes #682. Tasks are spawned on a random thread. Currently they stay there, but we should add task migration and load balancing in the future. This should drammatically improve our task performance benchmarks. --- src/rt/circular_buffer.cpp | 84 +++++++++++++++--------------- src/rt/rust.cpp | 7 ++- src/rt/rust_chan.cpp | 50 +++++++++--------- src/rt/rust_kernel.cpp | 85 +++++++++++++++---------------- src/rt/rust_kernel.h | 42 ++++++++------- src/rt/rust_scheduler.cpp | 56 +++++++++++--------- src/rt/rust_scheduler.h | 16 ++++-- src/rt/rust_task.cpp | 14 +++-- src/rt/rust_task.h | 1 + src/rt/rust_upcall.cpp | 10 ++-- src/rt/rust_util.h | 3 +- src/rt/sync/sync.h | 2 +- src/rt/test/rust_test_runtime.cpp | 13 +++-- src/test/run-pass/lib-task.rs | 41 +++++++++++++++ 14 files changed, 239 insertions(+), 185 deletions(-) create mode 100644 src/test/run-pass/lib-task.rs diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index b645a08e5638e..aa0127d8c255b 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -5,7 +5,6 @@ #include "rust_internal.h" circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : - sched(kernel->sched), kernel(kernel), unit_sz(unit_sz), _buffer_sz(initial_size()), @@ -13,26 +12,26 @@ circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : _unread(0), _buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) { - A(sched, unit_sz, "Unit size must be larger than zero."); + // A(sched, unit_sz, "Unit size must be larger than zero."); - DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" - "-> circular_buffer=0x%" PRIxPTR, - _buffer_sz, _unread, this); + // DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" + // "-> circular_buffer=0x%" PRIxPTR, + // _buffer_sz, _unread, this); - A(sched, _buffer, "Failed to allocate buffer."); + // A(sched, _buffer, "Failed to allocate buffer."); } circular_buffer::~circular_buffer() { - DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); - I(sched, _buffer); - W(sched, _unread == 0, - "freeing circular_buffer with %d unread bytes", _unread); + // DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); + // I(sched, _buffer); + // W(sched, _unread == 0, + // "freeing circular_buffer with %d unread bytes", _unread); kernel->free(_buffer); } size_t circular_buffer::initial_size() { - I(sched, unit_sz > 0); + // I(sched, unit_sz > 0); return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz; } @@ -41,8 +40,8 @@ circular_buffer::initial_size() { */ void circular_buffer::transfer(void *dst) { - I(sched, dst); - I(sched, _unread <= _buffer_sz); + // I(sched, dst); + // I(sched, _unread <= _buffer_sz); uint8_t *ptr = (uint8_t *) dst; @@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) { } else { head_sz = _buffer_sz - _next; } - I(sched, _next + head_sz <= _buffer_sz); + // I(sched, _next + head_sz <= _buffer_sz); memcpy(ptr, _buffer + _next, head_sz); // Then copy any other items from the beginning of the buffer - I(sched, _unread >= head_sz); + // I(sched, _unread >= head_sz); size_t tail_sz = _unread - head_sz; - I(sched, head_sz + tail_sz <= _buffer_sz); + // I(sched, head_sz + tail_sz <= _buffer_sz); memcpy(ptr + head_sz, _buffer, tail_sz); } @@ -70,21 +69,21 @@ circular_buffer::transfer(void *dst) { */ void circular_buffer::enqueue(void *src) { - I(sched, src); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, src); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); // Grow if necessary. if (_unread == _buffer_sz) { grow(); } - DLOG(sched, mem, "circular_buffer enqueue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, "circular_buffer enqueue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _unread < _buffer_sz); - I(sched, _unread + unit_sz <= _buffer_sz); + // I(sched, _unread < _buffer_sz); + // I(sched, _unread + unit_sz <= _buffer_sz); // Copy data size_t dst_idx = _next + _unread; @@ -92,15 +91,15 @@ circular_buffer::enqueue(void *src) { if (dst_idx >= _buffer_sz) { dst_idx -= _buffer_sz; - I(sched, _next >= unit_sz); - I(sched, dst_idx <= _next - unit_sz); + // I(sched, _next >= unit_sz); + // I(sched, dst_idx <= _next - unit_sz); } - I(sched, dst_idx + unit_sz <= _buffer_sz); + // I(sched, dst_idx + unit_sz <= _buffer_sz); memcpy(&_buffer[dst_idx], src, unit_sz); _unread += unit_sz; - DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); + // DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); } /** @@ -110,17 +109,17 @@ circular_buffer::enqueue(void *src) { */ void circular_buffer::dequeue(void *dst) { - I(sched, unit_sz > 0); - I(sched, _unread >= unit_sz); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, unit_sz > 0); + // I(sched, _unread >= unit_sz); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); - DLOG(sched, mem, - "circular_buffer dequeue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, + // "circular_buffer dequeue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _next + unit_sz <= _buffer_sz); + // I(sched, _next + unit_sz <= _buffer_sz); if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } @@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) { void circular_buffer::grow() { size_t new_buffer_sz = _buffer_sz * 2; - I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); - DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz); + // I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); + // DLOG(sched, mem, "circular_buffer is growing to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (grow)"); transfer(new_buffer); @@ -154,9 +154,9 @@ circular_buffer::grow() { void circular_buffer::shrink() { size_t new_buffer_sz = _buffer_sz / 2; - I(sched, initial_size() <= new_buffer_sz); - DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", - new_buffer_sz); + // I(sched, initial_size() <= new_buffer_sz); + // DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (shrink)"); transfer(new_buffer); diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 06097e3419776..df1486952ebf3 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -140,9 +140,10 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { update_log_settings(crate_map, getenv("RUST_LOG")); enable_claims(getenv("CHECK_CLAIMS")); + int num_threads = get_num_threads(); rust_srv *srv = new rust_srv(); - rust_kernel *kernel = new rust_kernel(srv); + rust_kernel *kernel = new rust_kernel(srv, num_threads); kernel->start(); rust_task *root_task = kernel->create_task(NULL, "main"); rust_scheduler *sched = root_task->sched; @@ -158,11 +159,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { root_task->start(main_fn, (uintptr_t)args->args); - int num_threads = get_num_threads(); - DLOG(sched, dom, "Using %d worker threads.", num_threads); - int ret = kernel->start_task_threads(num_threads); + int ret = kernel->start_task_threads(); delete args; delete kernel; delete srv; diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 470be6e6a37a8..9253d7d036101 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -13,17 +13,17 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy *port, if (port) { associate(port); } - DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR - ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, - (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); + // DLOG(task->sched, comm, "new rust_chan(task=0x%" PRIxPTR + // ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, + // (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } rust_chan::~rust_chan() { - DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", - (uintptr_t) this); + // DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", + // (uintptr_t) this); - A(kernel->sched, is_associated() == false, - "Channel must be disassociated before being freed."); + // A(kernel->sched, is_associated() == false, + // "Channel must be disassociated before being freed."); } /** @@ -33,9 +33,9 @@ void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, - "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, - this, port); + // DLOG(kernel->sched, task, + // "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + // this, port); ++this->ref_count; this->task = port->referent()->task; this->task->ref(); @@ -51,14 +51,14 @@ bool rust_chan::is_associated() { * Unlink this channel from its associated port. */ void rust_chan::disassociate() { - A(kernel->sched, is_associated(), - "Channel must be associated with a port."); + // A(kernel->sched, is_associated(), + // "Channel must be associated with a port."); if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, - "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - this, port->referent()); + // DLOG(kernel->sched, task, + // "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + // this, port->referent()); --this->ref_count; --this->task->ref_count; this->task = NULL; @@ -73,8 +73,8 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { - rust_scheduler *sched = kernel->sched; - I(sched, !port->is_proxy()); + // rust_scheduler *sched = kernel->sched; + // I(sched, !port->is_proxy()); rust_port *target_port = port->referent(); // TODO: We can probably avoid this lock by using atomic operations in @@ -84,13 +84,13 @@ void rust_chan::send(void *sptr) { buffer.enqueue(sptr); if (!is_associated()) { - W(sched, is_associated(), - "rust_chan::transmit with no associated port."); + // W(sched, is_associated(), + // "rust_chan::transmit with no associated port."); return; } - A(sched, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); + // A(sched, !buffer.is_empty(), + // "rust_chan::transmit with nothing to send."); if (port->is_proxy()) { data_message::send(buffer.peek(), buffer.unit_sz, "send data", @@ -98,7 +98,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { if (target_port->task->blocked_on(target_port)) { - DLOG(sched, comm, "dequeued in rendezvous_ptr"); + // DLOG(sched, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); target_port->task->rendezvous_ptr = 0; target_port->task->wakeup(target_port); @@ -120,7 +120,7 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { rust_handle *handle = task->sched->kernel->get_port_handle(port->as_referent()); maybe_proxy *proxy = new rust_proxy (handle); - DLOG(kernel->sched, mem, "new proxy: " PTR, proxy); + DLOG(task->sched, mem, "new proxy: " PTR, proxy); port = proxy; target_task = target->as_proxy()->handle()->referent(); } @@ -133,8 +133,8 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { * appear to be live, causing modify-after-free errors. */ void rust_chan::destroy() { - A(kernel->sched, ref_count == 0, - "Channel's ref count should be zero."); + // A(kernel->sched, ref_count == 0, + // "Channel's ref count should be zero."); if (is_associated()) { if (port->is_proxy()) { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 53c2d945b09c8..1eb826027988c 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -7,36 +7,40 @@ } \ } while (0) -rust_kernel::rust_kernel(rust_srv *srv) : +rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), _log(srv, NULL), - _srv(srv), - _interrupt_kernel_loop(FALSE) + srv(srv), + _interrupt_kernel_loop(FALSE), + num_threads(num_threads), + rval(0), + live_tasks(0) { - sched = create_scheduler("main"); + isaac_init(this, &rctx); + create_schedulers(); } rust_scheduler * -rust_kernel::create_scheduler(const char *name) { +rust_kernel::create_scheduler(int id) { _kernel_lock.lock(); rust_message_queue *message_queue = - new (this, "rust_message_queue") rust_message_queue(_srv, this); - rust_srv *srv = _srv->clone(); + new (this, "rust_message_queue") rust_message_queue(srv, this); + rust_srv *srv = this->srv->clone(); rust_scheduler *sched = new (this, "rust_scheduler") - rust_scheduler(this, message_queue, srv, name); + rust_scheduler(this, message_queue, srv, id); rust_handle *handle = internal_get_sched_handle(sched); message_queue->associate(handle); message_queues.append(message_queue); - KLOG("created scheduler: " PTR ", name: %s, index: %d", - sched, name, sched->list_index); + KLOG("created scheduler: " PTR ", id: %d, index: %d", + sched, id, sched->list_index); _kernel_lock.signal_all(); _kernel_lock.unlock(); return sched; } void -rust_kernel::destroy_scheduler() { +rust_kernel::destroy_scheduler(rust_scheduler *sched) { _kernel_lock.lock(); KLOG("deleting scheduler: " PTR ", name: %s, index: %d", sched, sched->name, sched->list_index); @@ -48,6 +52,18 @@ rust_kernel::destroy_scheduler() { _kernel_lock.unlock(); } +void rust_kernel::create_schedulers() { + for(int i = 0; i < num_threads; ++i) { + threads.push(create_scheduler(i)); + } +} + +void rust_kernel::destroy_schedulers() { + for(int i = 0; i < num_threads; ++i) { + destroy_scheduler(threads[i]); + } +} + rust_handle * rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { rust_handle *handle = NULL; @@ -59,14 +75,6 @@ rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { return handle; } -rust_handle * -rust_kernel::get_sched_handle(rust_scheduler *sched) { - _kernel_lock.lock(); - rust_handle *handle = internal_get_sched_handle(sched); - _kernel_lock.unlock(); - return handle; -} - rust_handle * rust_kernel::get_task_handle(rust_task *task) { _kernel_lock.lock(); @@ -98,7 +106,9 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - sched->log_state(); + for(int i = 0; i < num_threads; ++i) { + threads[i]->log_state(); + } } /** @@ -170,7 +180,7 @@ rust_kernel::terminate_kernel_loop() { } rust_kernel::~rust_kernel() { - destroy_scheduler(); + destroy_schedulers(); terminate_kernel_loop(); @@ -193,7 +203,7 @@ rust_kernel::~rust_kernel() { rust_message_queue *queue = NULL; while (message_queues.pop(&queue)) { - K(_srv, queue->is_empty(), "Kernel message queue should be empty " + K(srv, queue->is_empty(), "Kernel message queue should be empty " "before killing the kernel."); delete queue; } @@ -240,30 +250,25 @@ rust_kernel::signal_kernel_lock() { _kernel_lock.unlock(); } -int rust_kernel::start_task_threads(int num_threads) +int rust_kernel::start_task_threads() { - rust_task_thread *thread = NULL; - - // -1, because this thread will also be a thread. - for(int i = 0; i < num_threads - 1; ++i) { - thread = new rust_task_thread(i + 1, this); + for(int i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->start(); - threads.push(thread); } - sched->start_main_loop(0); - - while(threads.pop(&thread)) { + for(int i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->join(); - delete thread; } - return sched->rval; + return rval; } rust_task * rust_kernel::create_task(rust_task *spawner, const char *name) { - return sched->create_task(spawner, name); + // TODO: use a different rand. + return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } #ifdef __WIN32__ @@ -285,16 +290,6 @@ rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { } #endif -rust_task_thread::rust_task_thread(int id, rust_kernel *owner) - : id(id), owner(owner) -{ -} - -void rust_task_thread::run() -{ - owner->sched->start_main_loop(id); -} - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 07f4ff2f78765..8be9bb96e90c2 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -45,7 +45,10 @@ class rust_task_thread; class rust_kernel : public rust_thread { memory_region _region; rust_log _log; - rust_srv *_srv; + +public: + rust_srv *srv; +private: /** * Task proxy objects are kernel owned handles to Rust objects. @@ -62,20 +65,29 @@ class rust_kernel : public rust_thread { lock_and_signal _kernel_lock; + const size_t num_threads; + void terminate_kernel_loop(); void pump_message_queues(); rust_handle * internal_get_sched_handle(rust_scheduler *sched); - array_list threads; + array_list threads; + + randctx rctx; - rust_scheduler *create_scheduler(const char *name); - void destroy_scheduler(); + rust_scheduler *create_scheduler(int id); + void destroy_scheduler(rust_scheduler *sched); + + void create_schedulers(); + void destroy_schedulers(); public: - rust_scheduler *sched; - lock_and_signal scheduler_lock; + + int rval; + + volatile int live_tasks; /** * Message queues are kernel objects and are associated with domains. @@ -86,11 +98,10 @@ class rust_kernel : public rust_thread { */ indexed_list message_queues; - rust_handle *get_sched_handle(rust_scheduler *sched); rust_handle *get_task_handle(rust_task *task); rust_handle *get_port_handle(rust_port *port); - rust_kernel(rust_srv *srv); + rust_kernel(rust_srv *srv, size_t num_threads); bool is_deadlocked(); @@ -113,10 +124,7 @@ class rust_kernel : public rust_thread { void *realloc(void *mem, size_t size); void free(void *mem); - // FIXME: this should go away - inline rust_scheduler *get_scheduler() const { return sched; } - - int start_task_threads(int num_threads); + int start_task_threads(); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); @@ -125,14 +133,4 @@ class rust_kernel : public rust_thread { rust_task *create_task(rust_task *spawner, const char *name); }; -class rust_task_thread : public rust_thread { - int id; - rust_kernel *owner; - -public: - rust_task_thread(int id, rust_kernel *owner); - - virtual void run(); -}; - #endif /* RUST_KERNEL_H */ diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 09a78cebddbca..437be04e272c3 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -4,21 +4,23 @@ #include "globals.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name) : + rust_message_queue *message_queue, + rust_srv *srv, + int id) : interrupt_flag(0), _log(srv, this), log_lvl(log_note), srv(srv), - name(name), + // TODO: calculate a per scheduler name. + name("main"), newborn_tasks(this, "newborn"), running_tasks(this, "running"), blocked_tasks(this, "blocked"), dead_tasks(this, "dead"), cache(this), - rval(0), kernel(kernel), - message_queue(message_queue) + message_queue(message_queue), + id(id) { LOGPTR(this, "new dom", (uintptr_t)this); isaac_init(this, &rctx); @@ -47,9 +49,9 @@ rust_scheduler::activate(rust_task *task) { task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); - kernel->scheduler_lock.unlock(); + lock.unlock(); task->ctx.swap(ctx); - kernel->scheduler_lock.lock(); + lock.lock(); DLOG(this, task, "task has returned"); } @@ -67,8 +69,8 @@ void rust_scheduler::fail() { log(NULL, log_err, "domain %s @0x%" PRIxPTR " root task failed", name, this); - I(this, rval == 0); - rval = 1; + I(this, kernel->rval == 0); + kernel->rval = 1; exit(1); } @@ -82,7 +84,7 @@ rust_scheduler::number_of_live_tasks() { */ void rust_scheduler::reap_dead_tasks(int id) { - I(this, kernel->scheduler_lock.lock_held_by_current_thread()); + I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; // Make sure this task isn't still running somewhere else... @@ -93,6 +95,7 @@ rust_scheduler::reap_dead_tasks(int id) { "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); delete task; + sync::decrement(kernel->live_tasks); continue; } ++i; @@ -180,9 +183,9 @@ rust_scheduler::log_state() { * Returns once no more tasks can be scheduled and all task ref_counts * drop to zero. */ -int -rust_scheduler::start_main_loop(int id) { - kernel->scheduler_lock.lock(); +void +rust_scheduler::start_main_loop() { + lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. // @@ -193,11 +196,11 @@ rust_scheduler::start_main_loop(int id) { DLOG(this, dom, "started domain loop %d", id); - while (number_of_live_tasks() > 0) { + while (kernel->live_tasks > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); - DLOG(this, dom, "worker %d, number_of_live_tasks = %d", - id, number_of_live_tasks()); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d", + id, number_of_live_tasks(), kernel->live_tasks); drain_incoming_message_queue(true); @@ -212,11 +215,12 @@ rust_scheduler::start_main_loop(int id) { DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - kernel->scheduler_lock.unlock(); + lock.unlock(); sync::sleep(100); - kernel->scheduler_lock.lock(); + lock.lock(); DLOG(this, task, "scheduler resuming ..."); + reap_dead_tasks(id); continue; } @@ -264,19 +268,18 @@ rust_scheduler::start_main_loop(int id) { "scheduler yielding ...", dead_tasks.length()); log_state(); - kernel->scheduler_lock.unlock(); + lock.unlock(); sync::yield(); - kernel->scheduler_lock.lock(); + lock.lock(); } else { drain_incoming_message_queue(true); } reap_dead_tasks(id); } - DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + DLOG(this, dom, "finished main-loop %d", id); - kernel->scheduler_lock.unlock(); - return rval; + lock.unlock(); } rust_crate_cache * @@ -296,9 +299,16 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { task->on_wakeup(spawner->_on_wakeup); } newborn_tasks.append(task); + + sync::increment(kernel->live_tasks); + return task; } +void rust_scheduler::run() { + this->start_main_loop(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index cabcdf210a89f..c53e6157f06bc 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -27,7 +27,8 @@ rust_crate_cache }; struct rust_scheduler : public kernel_owned, - rc_base + rc_base, + rust_thread { // Fields known to the compiler: uintptr_t interrupt_flag; @@ -46,7 +47,6 @@ struct rust_scheduler : public kernel_owned, rust_crate_cache cache; randctx rctx; - int rval; rust_kernel *kernel; int32_t list_index; @@ -57,6 +57,10 @@ struct rust_scheduler : public kernel_owned, // Incoming messages from other domains. rust_message_queue *message_queue; + const int id; + + lock_and_signal lock; + #ifndef __WIN32__ pthread_attr_t attr; #endif @@ -64,8 +68,8 @@ struct rust_scheduler : public kernel_owned, // Only a pointer to 'name' is kept, so it must live as long as this // domain. rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name); + rust_message_queue *message_queue, rust_srv *srv, + int id); ~rust_scheduler(); void activate(rust_task *task); void log(rust_task *task, uint32_t level, char const *fmt, ...); @@ -80,11 +84,13 @@ struct rust_scheduler : public kernel_owned, void reap_dead_tasks(int id); rust_task *schedule_task(int id); - int start_main_loop(int id); + void start_main_loop(); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + virtual void run(); }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index de6b00acb3fd2..10ea48f57c24a 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -83,7 +83,8 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, pinned_on(-1), local_region(&sched->srv->local_region), _on_wakeup(NULL), - failed(false) + failed(false), + propagate_failure(true) { LOGPTR(sched, "new task", (uintptr_t)this); DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); @@ -207,8 +208,8 @@ rust_task::kill() { // Unblock the task so it can unwind. unblock(); - // if (this == sched->root_task) - // sched->fail(); + if (NULL == supervisor && propagate_failure) + sched->fail(); LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); @@ -229,6 +230,8 @@ rust_task::fail() { supervisor->kill(); } // FIXME: implement unwinding again. + if (NULL == supervisor && propagate_failure) + sched->fail(); failed = true; } @@ -248,6 +251,7 @@ rust_task::unsupervise() " disconnecting from supervisor %s @0x%" PRIxPTR, name, this, supervisor->name, supervisor); supervisor = NULL; + propagate_failure = false; } void @@ -397,8 +401,8 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - I(sched, !kernel->scheduler_lock.lock_held_by_current_thread()); - scoped_lock with(kernel->scheduler_lock); + I(sched, !sched->lock.lock_held_by_current_thread()); + scoped_lock with(sched->lock); DLOG(sched, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index b1984b9d40b82..9b1a3a395821f 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -91,6 +91,7 @@ rust_task : public maybe_proxy, // Indicates that the task ended in failure bool failed; + bool propagate_failure; lock_and_signal lock; diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 8313399130ca9..3415b6b62aec8 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -541,9 +541,9 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); - scoped_lock with(spawner->kernel->scheduler_lock); - rust_scheduler *sched = spawner->sched; - rust_task *task = sched->create_task(spawner, (const char *)name->data); + scoped_lock with(spawner->sched->lock); + rust_task *task = + spawner->kernel->create_task(spawner, (const char *)name->data); return task; } @@ -584,7 +584,7 @@ upcall_ivec_resize_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); I(task->sched, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -604,7 +604,7 @@ upcall_ivec_spill_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index e1644e9f3cfdc..89e7f2e7bedad 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -126,8 +126,9 @@ next_power_of_two(size_t s) // Initialization helper for ISAAC RNG +template static inline void -isaac_init(rust_scheduler *sched, randctx *rctx) +isaac_init(sched_or_kernel *sched, randctx *rctx) { memset(rctx, 0, sizeof(randctx)); diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index a932ef1c2cab4..8298f4028818d 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -1,4 +1,4 @@ -// -*- c++-mode -*- +// -*- c++ -*- #ifndef SYNC_H #define SYNC_H diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index f9a99d9acb1ea..1e7c10944a763 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -11,17 +11,16 @@ rust_test_runtime::~rust_test_runtime() { void rust_domain_test::worker::run() { - rust_scheduler *handle = kernel->get_scheduler(); for (int i = 0; i < TASKS; i++) { - handle->create_task(NULL, "child"); + kernel->create_task(NULL, "child"); } - sync::sleep(rand(&handle->rctx) % 1000); + //sync::sleep(rand(&handle->rctx) % 1000); } bool rust_domain_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -47,13 +46,13 @@ void rust_task_test::worker::run() { rust_task *root_task = kernel->create_task(NULL, "main"); root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - root_task->sched->start_main_loop(0); + root_task->sched->start_main_loop(); } bool rust_task_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -62,6 +61,6 @@ rust_task_test::run() { worker->start(); } - sync::sleep(rand(&kernel.sched->rctx) % 1000); + //sync::sleep(rand(&kernel.sched->rctx) % 1000); return true; } diff --git a/src/test/run-pass/lib-task.rs b/src/test/run-pass/lib-task.rs new file mode 100644 index 0000000000000..313ec8afcf1b3 --- /dev/null +++ b/src/test/run-pass/lib-task.rs @@ -0,0 +1,41 @@ + + +// xfail-stage0 + +use std; +import std::task; + +fn test_sleep() { task::sleep(1000000u); } + +fn test_unsupervise() { + fn f() { + task::unsupervise(); + fail; + } + spawn f(); +} + +fn test_join() { + fn winner() { + } + + auto wintask = spawn winner(); + + assert task::join(wintask) == task::tr_success; + + fn failer() { + task::unsupervise(); + fail; + } + + auto failtask = spawn failer(); + + assert task::join(failtask) == task::tr_failure; +} + +fn main() { + // FIXME: Why aren't we running this? + //test_sleep(); + test_unsupervise(); + test_join(); +} From 39b3e983c3a08c8627d321b9deb2b209c9d81ce2 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 25 Jul 2011 11:39:42 -0700 Subject: [PATCH 03/11] Re-removing a test case that was moved during the big test suite overhaul. --- src/test/run-pass/lib-task.rs | 41 ----------------------------------- 1 file changed, 41 deletions(-) delete mode 100644 src/test/run-pass/lib-task.rs diff --git a/src/test/run-pass/lib-task.rs b/src/test/run-pass/lib-task.rs deleted file mode 100644 index 313ec8afcf1b3..0000000000000 --- a/src/test/run-pass/lib-task.rs +++ /dev/null @@ -1,41 +0,0 @@ - - -// xfail-stage0 - -use std; -import std::task; - -fn test_sleep() { task::sleep(1000000u); } - -fn test_unsupervise() { - fn f() { - task::unsupervise(); - fail; - } - spawn f(); -} - -fn test_join() { - fn winner() { - } - - auto wintask = spawn winner(); - - assert task::join(wintask) == task::tr_success; - - fn failer() { - task::unsupervise(); - fail; - } - - auto failtask = spawn failer(); - - assert task::join(failtask) == task::tr_failure; -} - -fn main() { - // FIXME: Why aren't we running this? - //test_sleep(); - test_unsupervise(); - test_join(); -} From d6b7182309fbff38f726797a153e94bfe1c43705 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 25 Jul 2011 15:02:43 -0700 Subject: [PATCH 04/11] Adding a function to stdlib to set the min stack size, for programs that absolutely will not succeed with a large default stack. This should be removed once we have stack grown working. Also updated word-count to succeed under the new test framework. --- src/lib/task.rs | 6 ++++++ src/rt/rust_builtin.cpp | 7 +++++++ src/rt/rust_task.cpp | 3 ++- src/rt/rustrt.def.in | 1 + src/test/bench/task-perf-word-count.rs | 16 +++++++++++----- 5 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/lib/task.rs b/src/lib/task.rs index 5968c456909a1..62fff2cfc7584 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -8,6 +8,8 @@ native "rust" mod rustrt { fn clone_chan(c: *rust_chan) -> *rust_chan; type rust_chan; + + fn set_min_stack(stack_size: uint); } /** @@ -40,6 +42,10 @@ fn send[T](c: chan[T], v: &T) { c <| v; } fn recv[T](p: port[T]) -> T { let v; p |> v; v } +fn set_min_stack(uint stack_size) { + rustrt::set_min_stack(stack_size); +} + // Local Variables: // mode: rust; // fill-column: 78; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index eefc6dac8cebc..4870c9fe54598 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -856,6 +856,13 @@ clone_chan(rust_task *task, rust_chan *chan) { return chan->clone(task); } +// defined in rust_task.cpp +extern size_t g_min_stack_size; +extern "C" CDECL void +set_min_stack(rust_task *task, uintptr_t stack_size) { + g_min_stack_size = stack_size; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 10ea48f57c24a..538a9b34e5697 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -14,6 +14,7 @@ // FIXME (issue #151): This should be 0x300; the change here is for // practicality's sake until stack growth is working. +size_t g_min_stack_size = 0x300000; static size_t get_min_stk_size() { char *stack_size = getenv("RUST_MIN_STACK"); @@ -21,7 +22,7 @@ static size_t get_min_stk_size() { return strtol(stack_size, NULL, 0); } else { - return 0x300000; + return g_min_stack_size; } } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index c6d31a47f4c82..8326cabea88ff 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -37,6 +37,7 @@ rust_ptr_eq rust_run_program rust_start rust_getcwd +set_min_stack size_of squareroot str_alloc diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 6827ea55804ac..6e7e2bdba8aa5 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -1,6 +1,3 @@ -// xfail-stage1 -// xfail-stage2 -// xfail-stage3 /** A parallel word-frequency counting program. @@ -233,10 +230,19 @@ fn main(argv: vec[str]) { let out = io::stdout(); out.write_line(#fmt("Usage: %s ...", argv.(0))); - fail; + + // TODO: run something just to make sure the code hasn't + // broken yet. This is the unit test mode of this program. + + ret; } + // We can get by with 8k stacks, and we'll probably exhaust our + // address space otherwise. + task::set_min_stack(8192u); + let start = time::precise_time_ns(); + map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv))); let stop = time::precise_time_ns(); @@ -342,4 +348,4 @@ fn is_alpha_upper(c: char) -> bool { fn is_alpha(c: char) -> bool { is_alpha_upper(c) || is_alpha_lower(c) } -fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' } \ No newline at end of file +fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' } From 00a64d12e8b269c2aa8c614058810b9e5cbbcad2 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 25 Jul 2011 18:00:37 -0700 Subject: [PATCH 05/11] Made task threads wait instead of sleep, so they can be woken up. This appears to give us much better parallel performance. Also, commented out one more unsafe log and updated rust_kernel.cpp to compile under g++ --- src/rt/circular_buffer.cpp | 2 +- src/rt/rust_kernel.cpp | 16 ++++++++---- src/rt/rust_kernel.h | 1 + src/rt/rust_scheduler.cpp | 16 ++++++------ src/rt/rust_task.cpp | 8 +++++- src/rt/sync/lock_and_signal.cpp | 34 +++++++++++++++++++++++--- src/rt/sync/lock_and_signal.h | 5 +++- src/test/bench/task-perf-word-count.rs | 1 - 8 files changed, 61 insertions(+), 22 deletions(-) diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index aa0127d8c255b..ba098bbee3f20 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -123,7 +123,7 @@ circular_buffer::dequeue(void *dst) { if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } - DLOG(sched, mem, "shifted data from index %d", _next); + //DLOG(sched, mem, "shifted data from index %d", _next); _unread -= unit_sz; _next += unit_sz; if (_next == _buffer_sz) { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 1eb826027988c..9cdf07cd759f9 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -53,13 +53,13 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) { } void rust_kernel::create_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads.push(create_scheduler(i)); } } void rust_kernel::destroy_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { destroy_scheduler(threads[i]); } } @@ -106,7 +106,7 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads[i]->log_state(); } } @@ -252,12 +252,12 @@ rust_kernel::signal_kernel_lock() { int rust_kernel::start_task_threads() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->start(); } - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->join(); } @@ -271,6 +271,12 @@ rust_kernel::create_task(rust_task *spawner, const char *name) { return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } +void rust_kernel::wakeup_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + threads[i]->lock.signal_all(); + } +} + #ifdef __WIN32__ void rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 8be9bb96e90c2..cf9d88e00164a 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -106,6 +106,7 @@ class rust_kernel : public rust_thread { bool is_deadlocked(); void signal_kernel_lock(); + void wakeup_schedulers(); /** * Notifies the kernel whenever a message has been enqueued . This gives diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 437be04e272c3..4f19b0c681b1a 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -87,6 +87,7 @@ rust_scheduler::reap_dead_tasks(int id) { I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; + task->lock.lock(); // Make sure this task isn't still running somewhere else... if (task->ref_count == 0 && task->can_schedule(id)) { I(this, task->tasks_waiting_to_join.is_empty()); @@ -94,10 +95,13 @@ rust_scheduler::reap_dead_tasks(int id) { DLOG(this, task, "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); + task->lock.unlock(); delete task; sync::decrement(kernel->live_tasks); + kernel->wakeup_schedulers(); continue; } + task->lock.unlock(); ++i; } } @@ -206,21 +210,15 @@ rust_scheduler::start_main_loop() { rust_task *scheduled_task = schedule_task(id); - // The scheduler busy waits until a task is available for scheduling. - // Eventually we'll want a smarter way to do this, perhaps sleep - // for a minimum amount of time. - if (scheduled_task == NULL) { log_state(); DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - lock.unlock(); - sync::sleep(100); - lock.lock(); - DLOG(this, task, - "scheduler resuming ..."); + lock.timed_wait(100000); reap_dead_tasks(id); + DLOG(this, task, + "scheduler %d resuming ...", id); continue; } diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 538a9b34e5697..05efc12e4d43b 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -105,7 +105,7 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - // I(sched, ref_count == 0 || + I(sched, ref_count == 0); // || // (ref_count == 1 && this == sched->root_task)); del_stk(this, stk); @@ -167,6 +167,7 @@ rust_task::start(uintptr_t spawnee_fn, yield_timer.reset_us(0); transition(&sched->newborn_tasks, &sched->running_tasks); + sched->lock.signal(); } void @@ -212,6 +213,8 @@ rust_task::kill() { if (NULL == supervisor && propagate_failure) sched->fail(); + sched->lock.signal(); + LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); } @@ -442,12 +445,15 @@ rust_task::wakeup(rust_cond *from) { if(_on_wakeup) { _on_wakeup->on_wakeup(); } + + sched->lock.signal(); } void rust_task::die() { scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); + sched->lock.signal(); } void diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 3d0d101388908..f4c3778837d05 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -10,7 +10,9 @@ #include "lock_and_signal.h" #if defined(__WIN32__) -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : alive(true) +{ // FIXME: In order to match the behavior of pthread_cond_broadcast on // Windows, we create manual reset events. This however breaks the // behavior of pthread_cond_signal, fixing this is quite involved: @@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() { #else lock_and_signal::lock_and_signal() - : _locked(false) + : _locked(false), alive(true) { CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); @@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() { CHECKED(pthread_cond_destroy(&_cond)); CHECKED(pthread_mutex_destroy(&_mutex)); #endif + alive = false; } void lock_and_signal::lock() { @@ -65,11 +68,14 @@ void lock_and_signal::wait() { timed_wait(0); } -void lock_and_signal::timed_wait(size_t timeout_in_ns) { +bool lock_and_signal::timed_wait(size_t timeout_in_ns) { + _locked = false; + bool rv = true; #if defined(__WIN32__) LeaveCriticalSection(&_cs); WaitForSingleObject(_event, INFINITE); EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else if (timeout_in_ns == 0) { CHECKED(pthread_cond_wait(&_cond, &_mutex)); @@ -79,9 +85,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) { timespec time_spec; time_spec.tv_sec = time_val.tv_sec + 0; time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; - CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec)); + if(time_spec.tv_nsec >= 1000000000) { + time_spec.tv_sec++; + time_spec.tv_nsec -= 1000000000; + } + int cond_wait_status + = pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + switch(cond_wait_status) { + case 0: + // successfully grabbed the lock. + break; + case ETIMEDOUT: + // Oops, we timed out. + rv = false; + break; + default: + // Error + CHECKED(cond_wait_status); + } } + _holding_thread = pthread_self(); #endif + _locked = true; + return rv; } /** diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 60c22958342fb..6e656017115d2 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -14,6 +14,9 @@ class lock_and_signal { pthread_t _holding_thread; #endif bool _locked; + + bool alive; + public: lock_and_signal(); virtual ~lock_and_signal(); @@ -21,7 +24,7 @@ class lock_and_signal { void lock(); void unlock(); void wait(); - void timed_wait(size_t timeout_in_ns); + bool timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 6e7e2bdba8aa5..0a6b94c7f2ed2 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -184,7 +184,6 @@ mod map_reduce { let m; ctrl |> m; - alt m { mapper_done. { // log_err "received mapper terminated."; From 0f014710de81db36d87706d9f6d438621f049732 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 27 Jul 2011 12:29:38 -0700 Subject: [PATCH 06/11] Adding upcalls to to ref() and deref() tasks. This is the first step towards atomic reference counting of tasks. --- src/comp/back/upcall.rs | 4 ++++ src/comp/middle/trans.rs | 15 +++++++++++++-- src/comp/middle/ty.rs | 5 +++++ src/rt/rust_upcall.cpp | 17 +++++++++++++++++ src/rt/rustrt.def.in | 2 ++ src/rt/sync/lock_and_signal.cpp | 2 +- 6 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/comp/back/upcall.rs b/src/comp/back/upcall.rs index b12ceba8f0dd6..aa50b75e8b7d9 100644 --- a/src/comp/back/upcall.rs +++ b/src/comp/back/upcall.rs @@ -57,6 +57,8 @@ type upcalls = vec_append: ValueRef, get_type_desc: ValueRef, new_task: ValueRef, + take_task: ValueRef, + drop_task: ValueRef, start_task: ValueRef, ivec_resize: ValueRef, ivec_spill: ValueRef, @@ -129,6 +131,8 @@ fn declare_upcalls(tn: type_names, tydesc_type: TypeRef, ~[T_ptr(T_nil()), T_size_t(), T_size_t(), T_size_t(), T_ptr(T_ptr(tydesc_type))], T_ptr(tydesc_type)), new_task: d("new_task", ~[T_ptr(T_str())], taskptr_type), + take_task: dv("take_task", ~[taskptr_type]), + drop_task: dv("drop_task", ~[taskptr_type]), start_task: d("start_task", ~[taskptr_type, T_int(), T_int(), T_size_t()], taskptr_type), diff --git a/src/comp/middle/trans.rs b/src/comp/middle/trans.rs index 1475887ae4d05..6b1a091adc368 100644 --- a/src/comp/middle/trans.rs +++ b/src/comp/middle/trans.rs @@ -1221,7 +1221,13 @@ fn make_copy_glue(cx: &@block_ctxt, v: ValueRef, t: &ty::t) { // NB: v is an *alias* of type t here, not a direct value. let bcx; - if ty::type_is_boxed(bcx_tcx(cx), t) { + + if ty::type_is_task(bcx_tcx(cx), t) { + let task_ptr = cx.build.Load(v); + cx.build.Call(bcx_ccx(cx).upcalls.take_task, + ~[cx.fcx.lltaskptr, task_ptr]); + bcx = cx; + } else if ty::type_is_boxed(bcx_tcx(cx), t) { bcx = incr_refcnt_of_boxed(cx, cx.build.Load(v)).bcx; } else if (ty::type_is_structural(bcx_tcx(cx), t)) { bcx = duplicate_heap_parts_if_necessary(cx, v, t).bcx; @@ -1381,7 +1387,12 @@ fn make_drop_glue(cx: &@block_ctxt, v0: ValueRef, t: &ty::t) { ty::ty_box(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_port(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_chan(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } - ty::ty_task. { decr_refcnt_maybe_free(cx, v0, v0, t) } + ty::ty_task. { + let task_ptr = cx.build.Load(v0); + {bcx: cx, + val: cx.build.Call(bcx_ccx(cx).upcalls.drop_task, + ~[cx.fcx.lltaskptr, task_ptr])} + } ty::ty_obj(_) { let box_cell = cx.build.GEP(v0, ~[C_int(0), C_int(abi::obj_field_box)]); diff --git a/src/comp/middle/ty.rs b/src/comp/middle/ty.rs index 674e5c9b9cbad..cbf8062a8146b 100644 --- a/src/comp/middle/ty.rs +++ b/src/comp/middle/ty.rs @@ -161,6 +161,7 @@ export type_is_bot; export type_is_box; export type_is_boxed; export type_is_chan; +export type_is_task; export type_is_fp; export type_is_integral; export type_is_native; @@ -839,6 +840,10 @@ fn type_is_chan(cx: &ctxt, ty: &t) -> bool { alt struct(cx, ty) { ty_chan(_) { ret true; } _ { ret false; } } } +fn type_is_task(cx: &ctxt, ty: &t) -> bool { + alt struct(cx, ty) { ty_task. { ret true; } _ { ret false; } } +} + fn type_is_structural(cx: &ctxt, ty: &t) -> bool { alt struct(cx, ty) { ty_rec(_) { ret true; } diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 3415b6b62aec8..103aa49a6e9f9 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -547,6 +547,23 @@ upcall_new_task(rust_task *spawner, rust_vec *name) { return task; } +extern "C" CDECL void +upcall_take_task(rust_task *task, rust_task *target) { + LOG_UPCALL_ENTRY(task); + if(target) { + target->ref(); + } +} + +extern "C" CDECL void +upcall_drop_task(rust_task *task, rust_task *target) { + LOG_UPCALL_ENTRY(task); + if(target) { + //target->deref(); + --target->ref_count; + } +} + extern "C" CDECL rust_task * upcall_start_task(rust_task *spawner, rust_task *task, diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 8326cabea88ff..d6c218d936d8c 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -59,6 +59,7 @@ upcall_chan_target_task upcall_clone_chan upcall_del_chan upcall_del_port +upcall_drop_task upcall_dup_str upcall_exit upcall_fail @@ -87,6 +88,7 @@ upcall_shared_malloc upcall_shared_free upcall_sleep upcall_start_task +upcall_take_task upcall_trace_str upcall_trace_word upcall_vec_append diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index f4c3778837d05..67f6b107b3a4c 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -10,7 +10,7 @@ #include "lock_and_signal.h" #if defined(__WIN32__) -lock_and_signal::lock_and_signal() +lock_and_signal::lock_and_signal() : alive(true) { // FIXME: In order to match the behavior of pthread_cond_broadcast on From 8c96afa7bf9325da5e8541ba8cb024129c5dd5f8 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 27 Jul 2011 14:51:25 -0700 Subject: [PATCH 07/11] Atomic reference counting for tasks. --- src/rt/memory_region.cpp | 2 +- src/rt/rust_chan.cpp | 18 +++--------------- src/rt/rust_chan.h | 2 +- src/rt/rust_message.cpp | 4 ++-- src/rt/rust_scheduler.cpp | 25 +++++++++++-------------- src/rt/rust_task.cpp | 23 ++++------------------- src/rt/rust_task.h | 15 ++++++++++++--- src/rt/rust_upcall.cpp | 31 +++++++++---------------------- 8 files changed, 43 insertions(+), 77 deletions(-) diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp index 6e6515901974b..44924218e570a 100644 --- a/src/rt/memory_region.cpp +++ b/src/rt/memory_region.cpp @@ -4,7 +4,7 @@ // NB: please do not commit code with this uncommented. It's // hugely expensive and should only be used as a last resort. // -// #define TRACK_ALLOCATIONS +#define TRACK_ALLOCATIONS #define MAGIC 0xbadc0ffe diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 9253d7d036101..dc6ea0fefdf37 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -60,7 +60,7 @@ void rust_chan::disassociate() { // "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, // this, port->referent()); --this->ref_count; - --this->task->ref_count; + task->deref(); this->task = NULL; port->referent()->chans.swap_delete(this); } @@ -109,22 +109,10 @@ void rust_chan::send(void *sptr) { return; } -rust_chan *rust_chan::clone(maybe_proxy *target) { +rust_chan *rust_chan::clone(rust_task *target) { size_t unit_sz = buffer.unit_sz; maybe_proxy *port = this->port; - rust_task *target_task = NULL; - if (target->is_proxy() == false) { - port = this->port; - target_task = target->referent(); - } else { - rust_handle *handle = - task->sched->kernel->get_port_handle(port->as_referent()); - maybe_proxy *proxy = new rust_proxy (handle); - DLOG(task->sched, mem, "new proxy: " PTR, proxy); - port = proxy; - target_task = target->as_proxy()->handle()->referent(); - } - return new (target_task->kernel, "cloned chan") + return new (target->kernel, "cloned chan") rust_chan(kernel, port, unit_sz); } diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 056d70cebe4d4..68cdd31b3cc92 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -22,7 +22,7 @@ class rust_chan : public kernel_owned, void send(void *sptr); - rust_chan *clone(maybe_proxy *target); + rust_chan *clone(rust_task *target); // Called whenever the channel's ref count drops to zero. void destroy(); diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 59645d6d5bd9f..f8001a17193b1 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -61,8 +61,8 @@ void notify_message::process() { break; case JOIN: { if (task->dead() == false) { - rust_proxy *proxy = new rust_proxy(_source); - task->tasks_waiting_to_join.append(proxy); + // FIXME: this should be dead code. + assert(false); } else { send(WAKEUP, "wakeup", _target, _source); } diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 4f19b0c681b1a..245ee3d5fce70 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -89,14 +89,14 @@ rust_scheduler::reap_dead_tasks(int id) { rust_task *task = dead_tasks[i]; task->lock.lock(); // Make sure this task isn't still running somewhere else... - if (task->ref_count == 0 && task->can_schedule(id)) { + if (task->can_schedule(id)) { I(this, task->tasks_waiting_to_join.is_empty()); dead_tasks.remove(task); DLOG(this, task, "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); task->lock.unlock(); - delete task; + task->deref(); sync::decrement(kernel->live_tasks); kernel->wakeup_schedulers(); continue; @@ -174,9 +174,8 @@ rust_scheduler::log_state() { if (!dead_tasks.is_empty()) { log(NULL, log_note, "dead tasks:"); for (size_t i = 0; i < dead_tasks.length(); i++) { - log(NULL, log_note, "\t task: %s 0x%" PRIxPTR ", ref_count: %d", - dead_tasks[i]->name, dead_tasks[i], - dead_tasks[i]->ref_count); + log(NULL, log_note, "\t task: %s 0x%" PRIxPTR, + dead_tasks[i]->name, dead_tasks[i]); } } } @@ -225,15 +224,13 @@ rust_scheduler::start_main_loop() { I(this, scheduled_task->running()); DLOG(this, task, - "activating task %s 0x%" PRIxPTR - ", sp=0x%" PRIxPTR - ", ref_count=%d" - ", state: %s", - scheduled_task->name, - (uintptr_t)scheduled_task, - scheduled_task->rust_sp, - scheduled_task->ref_count, - scheduled_task->state->name); + "activating task %s 0x%" PRIxPTR + ", sp=0x%" PRIxPTR + ", state: %s", + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->rust_sp, + scheduled_task->state->name); interrupt_flag = 0; diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 05efc12e4d43b..a144879cc040b 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -64,7 +64,7 @@ size_t const callee_save_fp = 0; rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, rust_task *spawner, const char *name) : - maybe_proxy(this), + ref_count(1), stk(NULL), runtime_sp(0), rust_sp(0), @@ -92,10 +92,6 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, stk = new_stk(this, 0); rust_sp = stk->limit; - - if (spawner == NULL) { - ref_count = 0; - } } rust_task::~rust_task() @@ -131,10 +127,6 @@ void task_start_wrapper(spawn_args *a) LOG(task, task, "task exited with value %d", rval); - - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->sched, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); task->die(); task->lock.lock(); task->notify_tasks_waiting_to_join(); @@ -263,17 +255,10 @@ rust_task::notify_tasks_waiting_to_join() { while (tasks_waiting_to_join.is_empty() == false) { LOG(this, task, "notify_tasks_waiting_to_join: %d", tasks_waiting_to_join.size()); - maybe_proxy *waiting_task = 0; + rust_task *waiting_task = 0; tasks_waiting_to_join.pop(&waiting_task); - if (waiting_task->is_proxy()) { - notify_message::send(notify_message::WAKEUP, "wakeup", - get_handle(), waiting_task->as_proxy()->handle()); - delete waiting_task; - } else { - rust_task *task = waiting_task->referent(); - if (task->blocked() == true) { - task->wakeup(this); - } + if (waiting_task->blocked() == true) { + waiting_task->wakeup(this); } } } diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 9b1a3a395821f..8b55c0028a972 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -34,10 +34,19 @@ struct gc_alloc { } }; + struct -rust_task : public maybe_proxy, - public kernel_owned +rust_task : public kernel_owned, rust_cond { + // This block could be pulled out into something like a + // RUST_ATOMIC_REFCOUNTED macro. +private: + intptr_t ref_count; +public: + void ref() { sync::increment(ref_count); } + void deref() { if(0 == sync::decrement(ref_count)) { delete this; } } + + // Fields known to the compiler. stk_seg *stk; uintptr_t runtime_sp; // Runtime sp while task running. @@ -69,7 +78,7 @@ rust_task : public maybe_proxy, uintptr_t* rendezvous_ptr; // List of tasks waiting for this task to finish. - array_list *> tasks_waiting_to_join; + array_list tasks_waiting_to_join; rust_handle *handle; diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 103aa49a6e9f9..794bbc9c2443a 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -6,12 +6,10 @@ #define LOG_UPCALL_ENTRY(task) \ LOG(task, upcall, \ "> UPCALL %s - task: %s 0x%" PRIxPTR \ - " retpc: x%" PRIxPTR \ - " ref_count: %d", \ + " retpc: x%" PRIxPTR, \ __FUNCTION__, \ (task)->name, (task), \ - __builtin_return_address(0), \ - (task->ref_count)); + __builtin_return_address(0)); #else #define LOG_UPCALL_ENTRY(task) \ LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \ @@ -114,8 +112,8 @@ upcall_del_port(rust_task *task, rust_port *port) { I(task->sched, !port->ref_count); delete port; - // FIXME: We shouldn't ever directly manipulate the ref count. - --task->ref_count; + // FIXME: this should happen in the port. + task->deref(); } /** @@ -162,7 +160,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { * has its own copy of the channel. */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, maybe_proxy *target, +upcall_clone_chan(rust_task *task, rust_task *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); return chan->clone(target); @@ -247,18 +245,10 @@ upcall_fail(rust_task *task, * Called whenever a task's ref count drops to zero. */ extern "C" CDECL void -upcall_kill(rust_task *task, maybe_proxy *target) { +upcall_kill(rust_task *task, rust_task *target) { LOG_UPCALL_ENTRY(task); - if (target->is_proxy()) { - notify_message:: - send(notify_message::KILL, "kill", task->get_handle(), - target->as_proxy()->handle()); - // The proxy ref_count dropped to zero, delete it here. - delete target->as_proxy(); - } else { - target->referent()->kill(); - } + target->kill(); } /** @@ -267,9 +257,6 @@ upcall_kill(rust_task *task, maybe_proxy *target) { extern "C" CDECL void upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->sched, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); @@ -544,6 +531,7 @@ upcall_new_task(rust_task *spawner, rust_vec *name) { scoped_lock with(spawner->sched->lock); rust_task *task = spawner->kernel->create_task(spawner, (const char *)name->data); + task->ref(); return task; } @@ -559,8 +547,7 @@ extern "C" CDECL void upcall_drop_task(rust_task *task, rust_task *target) { LOG_UPCALL_ENTRY(task); if(target) { - //target->deref(); - --target->ref_count; + target->deref(); } } From 22c3b0ce22c94eadf1682054f43ba093e79bb27d Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 27 Jul 2011 15:32:54 -0700 Subject: [PATCH 08/11] Removed outdated comment. --- src/rt/rust_kernel.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 9cdf07cd759f9..896115c81ea80 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -267,7 +267,6 @@ int rust_kernel::start_task_threads() rust_task * rust_kernel::create_task(rust_task *spawner, const char *name) { - // TODO: use a different rand. return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } From f0aa6ca55a66e8456834cfb65135a6f35172fc3b Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 27 Jul 2011 16:33:31 -0700 Subject: [PATCH 09/11] Resurrecting some of the logging in rust_chan.cpp --- src/rt/rust_chan.cpp | 24 ++++++++++++------------ src/rt/rust_kernel.cpp | 32 ++++++++++++++------------------ src/rt/rust_log.h | 9 +++++++++ 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index dc6ea0fefdf37..c32e6b6cc5b6b 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -13,14 +13,14 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy *port, if (port) { associate(port); } - // DLOG(task->sched, comm, "new rust_chan(task=0x%" PRIxPTR - // ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, - // (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); + KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR + ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, + (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } rust_chan::~rust_chan() { - // DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", - // (uintptr_t) this); + KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", + (uintptr_t) this); // A(kernel->sched, is_associated() == false, // "Channel must be disassociated before being freed."); @@ -33,9 +33,9 @@ void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - // DLOG(kernel->sched, task, - // "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, - // this, port); + KLOG(kernel, task, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); ++this->ref_count; this->task = port->referent()->task; this->task->ref(); @@ -56,9 +56,9 @@ void rust_chan::disassociate() { if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - // DLOG(kernel->sched, task, - // "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - // this, port->referent()); + KLOG(kernel, task, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->referent()); --this->ref_count; task->deref(); this->task = NULL; @@ -98,7 +98,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { if (target_port->task->blocked_on(target_port)) { - // DLOG(sched, comm, "dequeued in rendezvous_ptr"); + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); target_port->task->rendezvous_ptr = 0; target_port->task->wakeup(target_port); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 896115c81ea80..a1b101ae9d407 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,11 +1,7 @@ #include "rust_internal.h" -#define KLOG(...) \ - do { \ - if (log_rt_kern >= log_note) { \ - log(log_note, __VA_ARGS__); \ - } \ - } while (0) +#define KLOG_(...) \ + KLOG(this, kern, __VA_ARGS__) rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), @@ -32,8 +28,8 @@ rust_kernel::create_scheduler(int id) { rust_handle *handle = internal_get_sched_handle(sched); message_queue->associate(handle); message_queues.append(message_queue); - KLOG("created scheduler: " PTR ", id: %d, index: %d", - sched, id, sched->list_index); + KLOG_("created scheduler: " PTR ", id: %d, index: %d", + sched, id, sched->list_index); _kernel_lock.signal_all(); _kernel_lock.unlock(); return sched; @@ -42,7 +38,7 @@ rust_kernel::create_scheduler(int id) { void rust_kernel::destroy_scheduler(rust_scheduler *sched) { _kernel_lock.lock(); - KLOG("deleting scheduler: " PTR ", name: %s, index: %d", + KLOG_("deleting scheduler: " PTR ", name: %s, index: %d", sched, sched->name, sched->list_index); sched->message_queue->disassociate(); rust_srv *srv = sched->srv; @@ -166,14 +162,14 @@ rust_kernel::start_kernel_loop() { void rust_kernel::run() { - KLOG("started kernel loop"); + KLOG_("started kernel loop"); start_kernel_loop(); - KLOG("finished kernel loop"); + KLOG_("finished kernel loop"); } void rust_kernel::terminate_kernel_loop() { - KLOG("terminating kernel loop"); + KLOG_("terminating kernel loop"); _interrupt_kernel_loop = true; signal_kernel_lock(); join(); @@ -190,16 +186,16 @@ rust_kernel::~rust_kernel() { // messages. pump_message_queues(); - KLOG("freeing handles"); + KLOG_("freeing handles"); free_handles(_task_handles); - KLOG("..task handles freed"); + KLOG_("..task handles freed"); free_handles(_port_handles); - KLOG("..port handles freed"); + KLOG_("..port handles freed"); free_handles(_sched_handles); - KLOG("..sched handles freed"); + KLOG_("..sched handles freed"); - KLOG("freeing queues"); + KLOG_("freeing queues"); rust_message_queue *queue = NULL; while (message_queues.pop(&queue)) { @@ -228,7 +224,7 @@ rust_kernel::free_handles(hash_map* > &map) { T* key; rust_handle *value; while (map.pop(&key, &value)) { - KLOG("...freeing " PTR, value); + KLOG_("...freeing " PTR, value); delete value; } } diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h index ce0d8f593efce..d14cb022bfdaa 100644 --- a/src/rt/rust_log.h +++ b/src/rt/rust_log.h @@ -23,6 +23,15 @@ const uint32_t log_note = 1; } \ } while (0) +#define KLOG(k, field, ...) \ + KLOG_LVL(k, field, log_note, __VA_ARGS__) +#define KLOG_LVL(k, field, lvl, ...) \ + do { \ + if (log_rt_##field >= lvl) { \ + (k)->log(lvl, __VA_ARGS__); \ + } \ + } while (0) + struct rust_scheduler; struct rust_task; From 7b76ace4d15a6bbb909f15ba0e22ad24604f2b74 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 27 Jul 2011 16:45:11 -0700 Subject: [PATCH 10/11] Re-enabled the rest of the asserts and things in rust_chan.cpp --- src/rt/rust_chan.cpp | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index c32e6b6cc5b6b..d77b196fd35c0 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -22,8 +22,8 @@ rust_chan::~rust_chan() { KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); - // A(kernel->sched, is_associated() == false, - // "Channel must be disassociated before being freed."); + A(kernel, is_associated() == false, + "Channel must be disassociated before being freed."); } /** @@ -51,8 +51,8 @@ bool rust_chan::is_associated() { * Unlink this channel from its associated port. */ void rust_chan::disassociate() { - // A(kernel->sched, is_associated(), - // "Channel must be associated with a port."); + A(kernel, is_associated(), + "Channel must be associated with a port."); if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); @@ -73,8 +73,7 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { - // rust_scheduler *sched = kernel->sched; - // I(sched, !port->is_proxy()); + I(kernel, !port->is_proxy()); rust_port *target_port = port->referent(); // TODO: We can probably avoid this lock by using atomic operations in @@ -84,13 +83,13 @@ void rust_chan::send(void *sptr) { buffer.enqueue(sptr); if (!is_associated()) { - // W(sched, is_associated(), - // "rust_chan::transmit with no associated port."); + W(kernel, is_associated(), + "rust_chan::transmit with no associated port."); return; } - // A(sched, !buffer.is_empty(), - // "rust_chan::transmit with nothing to send."); + A(kernel, !buffer.is_empty(), + "rust_chan::transmit with nothing to send."); if (port->is_proxy()) { data_message::send(buffer.peek(), buffer.unit_sz, "send data", @@ -121,8 +120,8 @@ rust_chan *rust_chan::clone(rust_task *target) { * appear to be live, causing modify-after-free errors. */ void rust_chan::destroy() { - // A(kernel->sched, ref_count == 0, - // "Channel's ref count should be zero."); + A(kernel, ref_count == 0, + "Channel's ref count should be zero."); if (is_associated()) { if (port->is_proxy()) { From 6b9b8d414b0abf09e5a2a0e89305a2d4596a8329 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 28 Jul 2011 10:41:48 -0700 Subject: [PATCH 11/11] Updating to work on Windows. --- src/lib/task.rs | 2 +- src/rt/rust_kernel.cpp | 6 ++++-- src/rt/rust_scheduler.h | 6 ++++++ src/rt/rust_util.h | 6 +++--- src/rt/sync/lock_and_signal.cpp | 3 ++- 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/lib/task.rs b/src/lib/task.rs index 62fff2cfc7584..1936cf2097193 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -42,7 +42,7 @@ fn send[T](c: chan[T], v: &T) { c <| v; } fn recv[T](p: port[T]) -> T { let v; p |> v; v } -fn set_min_stack(uint stack_size) { +fn set_min_stack(stack_size : uint) { rustrt::set_min_stack(stack_size); } diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index a1b101ae9d407..8839f7fe6af8b 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -2,6 +2,8 @@ #define KLOG_(...) \ KLOG(this, kern, __VA_ARGS__) +#define KLOG_ERR_(field, ...) \ + KLOG_LVL(this, field, log_err, __VA_ARGS__) rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), @@ -284,9 +286,9 @@ rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { NULL, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &buf, 0, NULL ); - DLOG_ERR(sched, dom, "%s failed with error %ld: %s", fn, err, buf); + KLOG_ERR_(dom, "%s failed with error %ld: %s", fn, err, buf); LocalFree((HLOCAL)buf); - I(sched, ok); + I(this, ok); } } #endif diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index c53e6157f06bc..9289883ab1e6b 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -91,6 +91,12 @@ struct rust_scheduler : public kernel_owned, rust_task *create_task(rust_task *spawner, const char *name); virtual void run(); + +#ifdef __WIN32__ + inline void win32_require(LPCTSTR fn, BOOL ok) { + kernel->win32_require(fn, ok); + } +#endif }; inline rust_log & diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index 89e7f2e7bedad..c46f191657377 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -135,15 +135,15 @@ isaac_init(sched_or_kernel *sched, randctx *rctx) #ifdef __WIN32__ { HCRYPTPROV hProv; - sched->kernel->win32_require + sched->win32_require (_T("CryptAcquireContext"), CryptAcquireContext(&hProv, NULL, NULL, PROV_RSA_FULL, CRYPT_VERIFYCONTEXT|CRYPT_SILENT)); - sched->kernel->win32_require + sched->win32_require (_T("CryptGenRandom"), CryptGenRandom(hProv, sizeof(rctx->randrsl), (BYTE*)(&rctx->randrsl))); - sched->kernel->win32_require + sched->win32_require (_T("CryptReleaseContext"), CryptReleaseContext(hProv, 0)); } diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 67f6b107b3a4c..df596c65cb2f7 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -73,7 +73,8 @@ bool lock_and_signal::timed_wait(size_t timeout_in_ns) { bool rv = true; #if defined(__WIN32__) LeaveCriticalSection(&_cs); - WaitForSingleObject(_event, INFINITE); + DWORD timeout = timeout_in_ns == 0 ? INFINITE : timeout_in_ns / 1000000; + rv = WaitForSingleObject(_event, timeout) != WAIT_TIMEOUT; EnterCriticalSection(&_cs); _holding_thread = GetCurrentThreadId(); #else