From be6c328e40288a2434db07c7fad324da1af8e8ea Mon Sep 17 00:00:00 2001 From: George Hotz Date: Tue, 19 Nov 2019 07:58:13 -0800 Subject: [PATCH 01/12] uids are pids --- messaging/msgq.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 777f11a28..c9bf5d21e 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -5,7 +5,6 @@ #include #include #include -#include #include #include @@ -140,9 +139,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { std::cout << "Starting publisher" << std::endl; - std::random_device rd("/dev/urandom"); - std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(rd); + uint64_t uid = getpid(); *q->write_uid = uid; *q->num_readers = 0; @@ -161,9 +158,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); - std::random_device rd("/dev/urandom"); - std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(rd); + uint64_t uid = getpid(); // Get reader id while (true){ From c779ff666510588a99cb51d30f6230a75f43d138 Mon Sep 17 00:00:00 2001 From: George Hotz Date: Tue, 19 Nov 2019 08:19:15 -0800 Subject: [PATCH 02/12] remove fifo, go for signal --- messaging/msgq.cc | 110 ++++++--------------------------------------- messaging/msgq.hpp | 7 --- 2 files changed, 13 insertions(+), 104 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index c9bf5d21e..3d5807b14 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -17,7 +18,6 @@ #include #include - #include #include "msgq.hpp" @@ -113,23 +113,11 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ q->endpoint = path; q->read_conflate = false; - q->read_fifo = -1; return 0; } void msgq_close_queue(msgq_queue_t *q){ - if (q->read_fifo >= 0){ - close(q->read_fifo); - remove(q->read_fifo_path.c_str()); - } - - for (uint64_t i = 0; i < NUM_READERS; i++){ - if (q->read_fifos[i] >= 0){ - close(q->read_fifos[i]); - } - } - if (q->mmap_p != NULL){ munmap(q->mmap_p, q->size + sizeof(msgq_header_t)); } @@ -146,8 +134,6 @@ void msgq_init_publisher(msgq_queue_t * q) { for (size_t i = 0; i < NUM_READERS; i++){ *q->read_valids[i] = false; - q->read_fifos[i] = -1; - q->read_fifos_uid[i] = 0; *q->read_uids[i] = 0; } @@ -195,26 +181,6 @@ void msgq_init_subscriber(msgq_queue_t * q) { } } - for (size_t i = 0; i < NUM_READERS; i++){ - q->read_fifos[i] = -1; - } - - q->read_fifo_path = "/dev/shm/fifo-"; - q->read_fifo_path += std::to_string(q->read_uid_local); - - std::cout << q->read_fifo_path << std::endl; - int r = mkfifo(q->read_fifo_path.c_str(), 0777); - if (r != 0) - perror("Fifo: "); - assert(r == 0); - - q->read_fifo = open(q->read_fifo_path.c_str(), O_RDWR | O_NONBLOCK); - - // Fysnc so the fifo shows up in the directory - auto shm_fd = open("/dev/shm", O_RDONLY); - fsync(shm_fd); - close(shm_fd); - std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; msgq_reset_reader(q); } @@ -298,44 +264,14 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - // Open fifo when not set, or when reader changes - if (q->read_fifos[i] == -1 || q->read_fifos_uid[i] != reader_uid){ - // Close old reader fifo - if (q->read_fifos[i] >= 0){ - close(q->read_fifos[i]); - } - - q->read_fifos_uid[i] = reader_uid; - - std::string path = "/dev/shm/fifo-"; - path += std::to_string(reader_uid); - - q->read_fifos[i] = open(path.c_str(), O_RDWR | O_NONBLOCK); - if(q->read_fifos[i] < 0){ - std::cout << "Fifo: " << path << std::endl; - perror("Error opening fifo"); - } - } - - uint8_t m = 1; - write(q->read_fifos[i], &m, 1); + // TODO: does SIGUSR1 cause EINTR from usleep? + // might need to configure it + kill(reader_uid, SIGUSR1); } return msg->size; } -int msgq_get_fd(msgq_queue_t * q){ - int id = q->reader_id; - assert(id >= 0); // Make sure subscriber is initialized - - if (q->read_uid_local != *q->read_uids[id]){ - std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; - msgq_init_subscriber(q); - } - - return q->read_fifo; -} - int msgq_msg_ready(msgq_queue_t * q){ start: @@ -375,10 +311,6 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ goto start; } - // Read one byte from fifo - char buf[1]; - read(q->read_fifo, buf, 1); - // Check valid if (!*q->read_valids[id]){ msgq_reset_reader(q); @@ -460,34 +392,18 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ assert(timeout >= 0); int num = 0; - struct pollfd * fds = (struct pollfd *)calloc(nitems, sizeof(struct pollfd)); - - // Build poll structure - for (size_t i = 0; i < nitems; i++){ - fds[i].fd = msgq_get_fd(items[i].q); - fds[i].events = POLLIN; - - // Check if message is ready in case we get out of sync with the pipe - if (msgq_msg_ready(items[i].q)){ - items[i].revents = 1; - timeout = 0; // No timeout if a message is ready - num++; - } else { - items[i].revents = 0; - } - } - poll(fds, nitems, timeout); + // block on signal or timeout + if (timeout == -1) { + while(1) { if (usleep(1000*1000) == EINTR) break; } + } else { + usleep(timeout*1000); + } - // Read poll results - for (size_t i = 0; i < nitems; i++){ - if (fds[i].revents && !items[i].revents){ - // Don't add it if it was already added - num++; - items[i].revents = 1; - } + // get number to return + for (size_t i = 0; i < nitems; i++) { + num += msgq_msg_ready(items[i].q); } - free(fds); return num; } diff --git a/messaging/msgq.hpp b/messaging/msgq.hpp index 116523e38..3bead13d9 100644 --- a/messaging/msgq.hpp +++ b/messaging/msgq.hpp @@ -35,14 +35,7 @@ struct msgq_queue_t { uint64_t write_uid_local; bool read_conflate; - int read_fifo; - - // Fifo fds and corresponding reader uid - int read_fifos[NUM_READERS]; - uint64_t read_fifos_uid[NUM_READERS]; - std::string endpoint; - std::string read_fifo_path; }; struct msgq_msg_t { From 14b344f6c99f5080b2e8106570ecf6859c18bb9c Mon Sep 17 00:00:00 2001 From: George Hotz Date: Tue, 19 Nov 2019 08:25:59 -0800 Subject: [PATCH 03/12] that's more what i'm saying --- messaging/msgq.cc | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 3d5807b14..a41501656 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -390,19 +390,22 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ assert(timeout >= 0); - int num = 0; - // block on signal or timeout - if (timeout == -1) { - while(1) { if (usleep(1000*1000) == EINTR) break; } - } else { - usleep(timeout*1000); - } + while (num == 0) { + int ret; + if (timeout == -1) { + ret = usleep(1000*1000); + } else { + ret = usleep(timeout*1000); + } + + // get number to return + for (size_t i = 0; i < nitems; i++) { + num += msgq_msg_ready(items[i].q); + } - // get number to return - for (size_t i = 0; i < nitems; i++) { - num += msgq_msg_ready(items[i].q); + if (timeout == -1 && ret != EINTR) break; } return num; From c0b471bbe4400cad64a345d69c765e9effd5fa60 Mon Sep 17 00:00:00 2001 From: George Hotz Date: Tue, 19 Nov 2019 08:30:26 -0800 Subject: [PATCH 04/12] oops, that's wrong --- messaging/msgq.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index a41501656..a1e619848 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -405,7 +405,8 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ num += msgq_msg_ready(items[i].q); } - if (timeout == -1 && ret != EINTR) break; + // exit if we had a timeout and the sleep finished + if (timeout != -1 && ret != EINTR) break; } return num; From 9b0668171f0b7624e40c3aa424ffe24124e5fcdd Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Tue, 19 Nov 2019 14:05:39 -0800 Subject: [PATCH 05/12] add some todos --- messaging/msgq.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index a1e619848..95eea8847 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -144,6 +144,8 @@ void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); + // TODO: Setup empty SIGUSR1 handler + uint64_t uid = getpid(); // Get reader id @@ -393,6 +395,10 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ int num = 0; while (num == 0) { + //TODO: if a message is ready on any of the sockets, don't sleep + + + // TODO: switch to nanosleep and store remaining time int ret; if (timeout == -1) { ret = usleep(1000*1000); From 9828b7c5d109d2d83a76d3d9123c7836fa5115c1 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 14:35:56 -0800 Subject: [PATCH 06/12] make signal poll work --- messaging/msgq.cc | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 95eea8847..a8cbd47aa 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -7,14 +7,14 @@ #include #include #include +#include - -#include #include #include #include #include #include +#include #include #include @@ -22,6 +22,9 @@ #include "msgq.hpp" +void sigusr1_handler(int signal) { + assert(signal == SIGUSR1); +} int msgq_msg_init_size(msgq_msg_t * msg, size_t size){ msg->size = size; @@ -68,6 +71,8 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes + std::signal(SIGUSR1, sigusr1_handler); + const char * prefix = "/dev/shm/"; char * full_path = new char[strlen(path) + strlen(prefix) + 1]; strcpy(full_path, prefix); @@ -127,7 +132,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { std::cout << "Starting publisher" << std::endl; - uint64_t uid = getpid(); + uint64_t uid = syscall (SYS_gettid); *q->write_uid = uid; *q->num_readers = 0; @@ -144,9 +149,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); - // TODO: Setup empty SIGUSR1 handler - - uint64_t uid = getpid(); + uint64_t uid = syscall (SYS_gettid); // Get reader id while (true){ @@ -194,7 +197,6 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ assert(q->write_uid_local == *q->write_uid); } - uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t)); // We need to fit at least three messages in the queue, @@ -268,7 +270,8 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ // TODO: does SIGUSR1 cause EINTR from usleep? // might need to configure it - kill(reader_uid, SIGUSR1); + std::cout << "kill " << reader_uid << std::endl; + syscall(SYS_tkill, reader_uid, SIGUSR1); } return msg->size; @@ -392,27 +395,36 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ assert(timeout >= 0); + + for (size_t i = 0; i < nitems; i++){ + items[i].revents = 0; + } + int num = 0; while (num == 0) { - //TODO: if a message is ready on any of the sockets, don't sleep - + // TODO: if a message is ready on any of the sockets, don't sleep + // TODO: switch to nanosleep and store remaining time in case there is a false positive - // TODO: switch to nanosleep and store remaining time int ret; if (timeout == -1) { - ret = usleep(1000*1000); + ret = usleep(100*1000); } else { ret = usleep(timeout*1000); } - // get number to return + // Check if messages ready for (size_t i = 0; i < nitems; i++) { - num += msgq_msg_ready(items[i].q); + if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ + num += 1; + items[i].revents = 1; + } } // exit if we had a timeout and the sleep finished - if (timeout != -1 && ret != EINTR) break; + if (timeout != -1 && ret == 0){ + break; + } } return num; From 7f047a6b62cd86de9fa9d61c71d73f1d0684648d Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 14:39:58 -0800 Subject: [PATCH 07/12] bring randomness back --- messaging/msgq.cc | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index a8cbd47aa..addf9a56f 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -26,6 +27,16 @@ void sigusr1_handler(int signal) { assert(signal == SIGUSR1); } +uint64_t msgq_get_uid(void){ + std::random_device rd("/dev/urandom"); + std::uniform_int_distribution distribution(0,std::numeric_limits::max()); + + uint64_t uid = distribution(rd); + uid = (uid & ~0xFFFF) | syscall (SYS_gettid); + + return uid; +} + int msgq_msg_init_size(msgq_msg_t * msg, size_t size){ msg->size = size; msg->data = new(std::nothrow) char[size]; @@ -131,8 +142,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { std::cout << "Starting publisher" << std::endl; - - uint64_t uid = syscall (SYS_gettid); + uint64_t uid = msgq_get_uid(); *q->write_uid = uid; *q->num_readers = 0; @@ -149,7 +159,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); - uint64_t uid = syscall (SYS_gettid); + uint64_t uid = msgq_get_uid(); // Get reader id while (true){ @@ -268,10 +278,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - // TODO: does SIGUSR1 cause EINTR from usleep? - // might need to configure it - std::cout << "kill " << reader_uid << std::endl; - syscall(SYS_tkill, reader_uid, SIGUSR1); + syscall(SYS_tkill, reader_uid & 0xFFFF, SIGUSR1); } return msg->size; From 11749752db642cbc5782e9e1b34aa1390912b68a Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 14:58:03 -0800 Subject: [PATCH 08/12] fix case in msgq when subs get evicted while polling --- messaging/msgq.cc | 14 +++++++++----- messaging/tests/test_poller.py | 7 +++++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index addf9a56f..69f22280a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -29,11 +29,9 @@ void sigusr1_handler(int signal) { uint64_t msgq_get_uid(void){ std::random_device rd("/dev/urandom"); - std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - - uint64_t uid = distribution(rd); - uid = (uid & ~0xFFFF) | syscall (SYS_gettid); + std::uniform_int_distribution distribution(0,std::numeric_limits::max()); + uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid); return uid; } @@ -173,7 +171,12 @@ void msgq_init_subscriber(msgq_queue_t * q) { for (size_t i = 0; i < NUM_READERS; i++){ *q->read_valids[i] = false; + + uint64_t old_uid = *q->read_uids[i]; *q->read_uids[i] = 0; + + // Wake up reader in case they are in a poll + syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); } continue; @@ -278,7 +281,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - syscall(SYS_tkill, reader_uid & 0xFFFF, SIGUSR1); + syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); } return msg->size; @@ -420,6 +423,7 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ ret = usleep(timeout*1000); } + // Check if messages ready for (size_t i = 0; i < nitems; i++) { if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ diff --git a/messaging/tests/test_poller.py b/messaging/tests/test_poller.py index 0f343fb89..9a516a69f 100644 --- a/messaging/tests/test_poller.py +++ b/messaging/tests/test_poller.py @@ -15,7 +15,7 @@ def poller(): sub.connect(context, 'controlsState') p.registerSocket(sub) - socks = p.poll(1000) + socks = p.poll(10000) r = [s.receive(non_blocking=True) for s in socks] return r @@ -44,7 +44,6 @@ def test_poll_once(self): self.assertEqual(result, [b"a"]) - @unittest.skipIf(os.environ.get('MSGQ'), "fails under msgq") def test_poll_and_create_many_subscribers(self): context = messaging.Context() @@ -69,3 +68,7 @@ def test_poll_and_create_many_subscribers(self): context.term() self.assertEqual(result, [b"a"]) + + +if __name__ == "__main__": + unittest.main() From a4ae77708107bf8d9f2fe5accb408f8517144050 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 15:01:16 -0800 Subject: [PATCH 09/12] check for ready messages before poll starts --- messaging/msgq.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 69f22280a..6563acdf9 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -406,11 +406,14 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ assert(timeout >= 0); - for (size_t i = 0; i < nitems; i++){ - items[i].revents = 0; + int num = 0; + + // Check if messages ready + for (size_t i = 0; i < nitems; i++) { + items[i].revents = msgq_msg_ready(items[i].q); + if (items[i].revents) num++; } - int num = 0; while (num == 0) { // TODO: if a message is ready on any of the sockets, don't sleep From fd9c22dcd00a8271d17f4580f4fac92a55a4f022 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 15:02:54 -0800 Subject: [PATCH 10/12] No pr builds --- azure-pipelines.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 613754aec..c14526a5c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,3 +1,5 @@ +pr: none + pool: vmImage: 'ubuntu-16.04' From b420371b372caaaa5faa82c62f2d3fad33c96c5b Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 21 Nov 2019 15:56:10 -0800 Subject: [PATCH 11/12] use nanosleep with remainder --- messaging/msgq.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 6563acdf9..54058d212 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -414,18 +414,16 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ if (items[i].revents) num++; } + int ms = (timeout == -1) ? 100 : timeout; + struct timespec ts; + ts.tv_sec = ms / 1000; + ts.tv_nsec = (ms % 1000) * 1000 * 1000; - while (num == 0) { - // TODO: if a message is ready on any of the sockets, don't sleep - // TODO: switch to nanosleep and store remaining time in case there is a false positive + while (num == 0) { int ret; - if (timeout == -1) { - ret = usleep(100*1000); - } else { - ret = usleep(timeout*1000); - } + ret = nanosleep(&ts, &ts); // Check if messages ready for (size_t i = 0; i < nitems; i++) { From 7181c315f69db1ffc5e7d900859f5b92d7459d69 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Fri, 22 Nov 2019 11:50:09 -0800 Subject: [PATCH 12/12] this should pass the test --- messaging/tests/test_poller.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/messaging/tests/test_poller.py b/messaging/tests/test_poller.py index 9a516a69f..facc74ef8 100644 --- a/messaging/tests/test_poller.py +++ b/messaging/tests/test_poller.py @@ -58,6 +58,8 @@ def test_poll_and_create_many_subscribers(self): for _ in range(10): messaging.SubSocket().connect(c, 'controlsState') + time.sleep(0.1) + # Send message pub.send("a")