From 7d3f797edade4e7ecb56840226fdef45db8e73fc Mon Sep 17 00:00:00 2001 From: Nir Soffer Date: Mon, 30 Sep 2024 20:05:44 +0300 Subject: [PATCH] Improve dispatch queue usage The way we used dispatch queue was not optimal, leading to creation of too many threads, which can cause unneeded context switching, and lower performance. We use now 2 queues: - vms_queue - concurrent queue use to run accepted connections threads. For this queue we must use concurrent queue since we want to process requests from VMs in parallel. - host_queue - serial queue for processing events from vmnet. This queue is used for the start and stop callback and for the events callback. With this change we use consistent number of threads: main thread, 1 vmnet thread, 1 thread per connected vm. Testing show no change in performance. | hardware | os | test | before (Gb/s) | after (Gb/s) | |----------|--------|---------------|---------------|--------------| | M1 Pro | 14.7.1 | host-to-vm | 3.252 | 3.219 | | M1 Pro | 14.7.1 | host-to-vm-2 | 1.902 | 1.880 | | M1 Pro | 14.7.1 | vm-to-vm | 1.493 | 1.507 | | M2 Max | 15.1 | host-to-vm | 2.335 | 2.359 | | M2 Max | 15.1 | host-to-vm-2 | 1.798 | 1.802 | | M2 Max | 15.1 | vm-to-vm | 1.275 | 1.266 | Results are average send throughput of 10 runs. Using performance tests from #66. We have a performance regression on macOS 15.1. With 14.7 on same M2 Max machine it was up ot 1.2 times faster. With 15.1 it is up to 1.4 times slower. Signed-off-by: Nir Soffer --- main.c | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/main.c b/main.c index 4d3d4c5..3dbf6c1 100644 --- a/main.c +++ b/main.c @@ -85,6 +85,8 @@ struct conn { struct state { dispatch_semaphore_t sem; + dispatch_queue_t vms_queue; + dispatch_queue_t host_queue; struct conn *conns; // TODO: avoid O(N) lookup } _state; @@ -252,8 +254,6 @@ static interface_ref start(struct state *state, struct cli_options *cliopt) { cliopt->vmnet_nat66_prefix); } - dispatch_queue_t q = dispatch_queue_create( - "io.github.lima-vm.socket_vmnet.start", DISPATCH_QUEUE_SERIAL); dispatch_semaphore_t sem = dispatch_semaphore_create(0); __block interface_ref iface; @@ -261,7 +261,7 @@ static interface_ref start(struct state *state, struct cli_options *cliopt) { __block uint64_t max_bytes = 0; iface = vmnet_start_interface( - dict, q, ^(vmnet_return_t x_status, xpc_object_t x_param) { + dict, state->host_queue, ^(vmnet_return_t x_status, xpc_object_t x_param) { status = x_status; if (x_status == VMNET_SUCCESS) { print_vmnet_start_param(x_param); @@ -271,17 +271,14 @@ static interface_ref start(struct state *state, struct cli_options *cliopt) { dispatch_semaphore_signal(sem); }); dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); - dispatch_release(q); xpc_release(dict); if (status != VMNET_SUCCESS) { ERRORF("vmnet_start_interface: [%d] %s", status, vmnet_strerror(status)); return NULL; } - dispatch_queue_t event_q = dispatch_queue_create( - "io.github.lima-vm.socket_vmnet.events", DISPATCH_QUEUE_CONCURRENT); vmnet_interface_set_event_callback( - iface, VMNET_INTERFACE_PACKETS_AVAILABLE, event_q, + iface, VMNET_INTERFACE_PACKETS_AVAILABLE, state->host_queue, ^(interface_event_t __attribute__((unused)) x_event_id, xpc_object_t x_event) { uint64_t estim_count = xpc_dictionary_get_uint64( @@ -298,21 +295,17 @@ static void signalhandler(int signal) { siglongjmp(jmpbuf, 1); } -static void stop(interface_ref iface) { +static void stop(struct state *state, interface_ref iface) { if (iface == NULL) { return; } - dispatch_queue_t q = dispatch_queue_create( - "io.github.lima-vm.socket_vmnet.stop", DISPATCH_QUEUE_SERIAL); dispatch_semaphore_t sem = dispatch_semaphore_create(0); __block vmnet_return_t status; - vmnet_stop_interface(iface, q, ^(vmnet_return_t x_status) { + vmnet_stop_interface(iface, state->host_queue, ^(vmnet_return_t x_status) { status = x_status; dispatch_semaphore_signal(sem); }); dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); - dispatch_release(q); - // TODO: release event_q ? if (status != VMNET_SUCCESS) { ERRORF("vmnet_stop_interface: [%d] %s", status, vmnet_strerror(status)); } @@ -376,8 +369,6 @@ int main(int argc, char *argv[]) { debug = getenv("DEBUG") != NULL; int rc = 1, listen_fd = -1; __block interface_ref iface = NULL; - dispatch_queue_t q = dispatch_queue_create( - "io.github.lima-vm.socket_vmnet.accept", DISPATCH_QUEUE_CONCURRENT); struct cli_options *cliopt = cli_options_parse(argc, argv); assert(cliopt != NULL); @@ -420,6 +411,15 @@ int main(int argc, char *argv[]) { struct state state; memset(&state, 0, sizeof(state)); state.sem = dispatch_semaphore_create(1); + + // Queue for vm connections, allowing processing vms requests in parallel. + state.vms_queue = dispatch_queue_create( + "io.github.lima-vm.socket_vmnet.vms", DISPATCH_QUEUE_CONCURRENT); + + // Queue for processing vmnet events. + state.host_queue = dispatch_queue_create( + "io.github.lima-vm.socket_vmnet.host", DISPATCH_QUEUE_SERIAL); + iface = start(&state, cliopt); if (iface == NULL) { // Error already logged. @@ -442,16 +442,15 @@ int main(int argc, char *argv[]) { goto done; } struct state *state_p = &state; - dispatch_async(q, ^{ + dispatch_async(state.vms_queue, ^{ on_accept(state_p, accept_fd, iface); }); } rc = 0; done: DEBUGF("shutting down with rc=%d", rc); - dispatch_release(q); if (iface != NULL) { - stop(iface); + stop(&state, iface); } if (listen >= 0) { close(listen_fd); @@ -460,6 +459,8 @@ int main(int argc, char *argv[]) { unlink(cliopt->pidfile); close(pid_fd); } + dispatch_release(state.vms_queue); + dispatch_release(state.host_queue); cli_options_destroy(cliopt); return rc; }