From 292c68be4f432bb9e0811972e79a46cb67b5ced0 Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Mon, 19 Feb 2024 22:03:21 -0500 Subject: [PATCH 1/7] Data merge worker added in - logging not in main loop yet --- main_2024.py | 62 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/main_2024.py b/main_2024.py index 8d8a70ff..bac05000 100644 --- a/main_2024.py +++ b/main_2024.py @@ -13,6 +13,7 @@ from modules.detect_target import detect_target_worker from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker +from modules.data_merge import data_merge_worker from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -68,6 +69,8 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"] FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"] FLIGHT_INTERFACE_WORKER_PERIOD = config["flight_interface"]["worker_period"] + + DATA_MERGE_TIMEOUT = config["data_merge"]["timeout"] except KeyError: print("Config key(s) not found") return -1 @@ -80,15 +83,20 @@ def main() -> int: mp_manager = mp.Manager() video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, + # tmp - are the commas supposed to be on these second elements too? + QUEUE_MAX_SIZE, + ) + detect_target_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, QUEUE_MAX_SIZE, ) - detect_target_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + flight_interface_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, ) - flight_interface_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + data_merge_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, - QUEUE_MAX_SIZE + QUEUE_MAX_SIZE, ) video_input_manager = worker_manager.WorkerManager() @@ -114,7 +122,7 @@ def main() -> int: DETECT_TARGET_OVERRIDE_FULL_PRECISION, DETECT_TARGET_SAVE_PREFIX, video_input_to_detect_target_queue, - detect_target_to_main_queue, + detect_target_to_data_merge_queue, controller, ), ) @@ -127,7 +135,20 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_WORKER_PERIOD, - flight_interface_to_main_queue, + flight_interface_to_data_merge_queue, + controller, + ), + ) + + data_merge_manager = worker_manager.WorkerManager() + data_merge_manager.create_workers( + 1, + data_merge_worker.data_merge_worker, + ( + DATA_MERGE_TIMEOUT, + detect_target_to_data_merge_queue, + flight_interface_to_data_merge_queue, + data_merge_to_main_queue, controller, ), ) @@ -136,15 +157,36 @@ def main() -> int: video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() + data_merge_manager.start_workers() while True: + # try: + # merged_data = data_merge_to_main_queue.queue.get_nowait() + # except queue.Empty: + # merged_data = None + + # if merged_data is not None: + # timestamp = merged_data.timestamp + # odometry = merged_data.odometry_local + # detections = merged_data.detections + + # print("timestamp: " + str(timestamp)) + # print("north: " + str(odometry.position.north)) + # print("east: " + str(position.east)) + # print("down: " + str(position.down)) + # print("yaw: " + str(orientation.yaw)) + # print("roll: " + str(orientation.roll)) + # print("pitch: " + str(orientation.pitch)) + # print("detections: " + str(detections)) + # print("") + try: - image = detect_target_to_main_queue.queue.get_nowait() + image = detect_target_to_data_merge_queue.queue.get_nowait() except queue.Empty: image = None odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \ - flight_interface_to_main_queue.queue.get() + flight_interface_to_data_merge_queue.queue.get() if odometry_and_time_info is not None: timestamp = odometry_and_time_info.timestamp @@ -172,12 +214,14 @@ def main() -> int: controller.request_exit() video_input_to_detect_target_queue.fill_and_drain_queue() - detect_target_to_main_queue.fill_and_drain_queue() - flight_interface_to_main_queue.fill_and_drain_queue() + detect_target_to_data_merge_queue.fill_and_drain_queue() + flight_interface_to_data_merge_queue.fill_and_drain_queue() + data_merge_to_main_queue.fill_and_drain_queue() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() + data_merge_manager.join_workers() return 0 From bb4ecfd4c4d05b42c0b44e14287c357bd54d711c Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Mon, 19 Feb 2024 22:13:00 -0500 Subject: [PATCH 2/7] Going back to original temporarily --- main_2024.py | 62 ++++++++------------------------------------------ modules/common | 2 +- 2 files changed, 10 insertions(+), 54 deletions(-) diff --git a/main_2024.py b/main_2024.py index bac05000..8d8a70ff 100644 --- a/main_2024.py +++ b/main_2024.py @@ -13,7 +13,6 @@ from modules.detect_target import detect_target_worker from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker -from modules.data_merge import data_merge_worker from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -69,8 +68,6 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"] FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"] FLIGHT_INTERFACE_WORKER_PERIOD = config["flight_interface"]["worker_period"] - - DATA_MERGE_TIMEOUT = config["data_merge"]["timeout"] except KeyError: print("Config key(s) not found") return -1 @@ -83,20 +80,15 @@ def main() -> int: mp_manager = mp.Manager() video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, - # tmp - are the commas supposed to be on these second elements too? - QUEUE_MAX_SIZE, - ) - detect_target_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( - mp_manager, QUEUE_MAX_SIZE, ) - flight_interface_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( + detect_target_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, ) - data_merge_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + flight_interface_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, - QUEUE_MAX_SIZE, + QUEUE_MAX_SIZE ) video_input_manager = worker_manager.WorkerManager() @@ -122,7 +114,7 @@ def main() -> int: DETECT_TARGET_OVERRIDE_FULL_PRECISION, DETECT_TARGET_SAVE_PREFIX, video_input_to_detect_target_queue, - detect_target_to_data_merge_queue, + detect_target_to_main_queue, controller, ), ) @@ -135,20 +127,7 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_WORKER_PERIOD, - flight_interface_to_data_merge_queue, - controller, - ), - ) - - data_merge_manager = worker_manager.WorkerManager() - data_merge_manager.create_workers( - 1, - data_merge_worker.data_merge_worker, - ( - DATA_MERGE_TIMEOUT, - detect_target_to_data_merge_queue, - flight_interface_to_data_merge_queue, - data_merge_to_main_queue, + flight_interface_to_main_queue, controller, ), ) @@ -157,36 +136,15 @@ def main() -> int: video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() - data_merge_manager.start_workers() while True: - # try: - # merged_data = data_merge_to_main_queue.queue.get_nowait() - # except queue.Empty: - # merged_data = None - - # if merged_data is not None: - # timestamp = merged_data.timestamp - # odometry = merged_data.odometry_local - # detections = merged_data.detections - - # print("timestamp: " + str(timestamp)) - # print("north: " + str(odometry.position.north)) - # print("east: " + str(position.east)) - # print("down: " + str(position.down)) - # print("yaw: " + str(orientation.yaw)) - # print("roll: " + str(orientation.roll)) - # print("pitch: " + str(orientation.pitch)) - # print("detections: " + str(detections)) - # print("") - try: - image = detect_target_to_data_merge_queue.queue.get_nowait() + image = detect_target_to_main_queue.queue.get_nowait() except queue.Empty: image = None odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \ - flight_interface_to_data_merge_queue.queue.get() + flight_interface_to_main_queue.queue.get() if odometry_and_time_info is not None: timestamp = odometry_and_time_info.timestamp @@ -214,14 +172,12 @@ def main() -> int: controller.request_exit() video_input_to_detect_target_queue.fill_and_drain_queue() - detect_target_to_data_merge_queue.fill_and_drain_queue() - flight_interface_to_data_merge_queue.fill_and_drain_queue() - data_merge_to_main_queue.fill_and_drain_queue() + detect_target_to_main_queue.fill_and_drain_queue() + flight_interface_to_main_queue.fill_and_drain_queue() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() - data_merge_manager.join_workers() return 0 diff --git a/modules/common b/modules/common index c780f36c..a886a389 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit c780f36cd1bd6877b70d04318480deb29d4ddb6e +Subproject commit a886a389be700cd3ada97c1f58187a5bf127bb1c From 941a02224747395a89dec8c4d36f4e290330e058 Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Mon, 19 Feb 2024 22:27:34 -0500 Subject: [PATCH 3/7] Changes add to updated main branch --- main_2024.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/main_2024.py b/main_2024.py index 9dba7564..93ee90f3 100644 --- a/main_2024.py +++ b/main_2024.py @@ -15,6 +15,7 @@ from modules.detect_target import detect_target_worker from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker +from modules.data_merge import data_merge_worker from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -76,6 +77,8 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"] FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"] FLIGHT_INTERFACE_WORKER_PERIOD = config["flight_interface"]["worker_period"] + + DATA_MERGE_TIMEOUT = config["data_merge"]["timeout"] except KeyError: print("Config key(s) not found") return -1 @@ -90,13 +93,17 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) - detect_target_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + detect_target_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) + flight_interface_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, ) - flight_interface_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + data_merge_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, - QUEUE_MAX_SIZE + QUEUE_MAX_SIZE, ) video_input_manager = worker_manager.WorkerManager() @@ -123,7 +130,7 @@ def main() -> int: DETECT_TARGET_SHOW_ANNOTATED, DETECT_TARGET_SAVE_PREFIX, video_input_to_detect_target_queue, - detect_target_to_main_queue, + detect_target_to_data_merge_queue, controller, ), ) @@ -136,7 +143,20 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_WORKER_PERIOD, - flight_interface_to_main_queue, + flight_interface_to_data_merge_queue, + controller, + ), + ) + + data_merge_manager = worker_manager.WorkerManager() + data_merge_manager.create_workers( + 1, + data_merge_worker.data_merge_worker, + ( + DATA_MERGE_TIMEOUT, + detect_target_to_data_merge_queue, + flight_interface_to_data_merge_queue, + data_merge_to_main_queue, controller, ), ) @@ -145,8 +165,29 @@ def main() -> int: video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() + data_merge_manager.start_workers() while True: + # try: + # merged_data = data_merge_to_main_queue.queue.get_nowait() + # except queue.Empty: + # merged_data = None + + # if merged_data is not None: + # timestamp = merged_data.timestamp + # odometry = merged_data.odometry_local + # detections = merged_data.detections + + # print("timestamp: " + str(timestamp)) + # print("north: " + str(odometry.position.north)) + # print("east: " + str(position.east)) + # print("down: " + str(position.down)) + # print("yaw: " + str(orientation.yaw)) + # print("roll: " + str(orientation.roll)) + # print("pitch: " + str(orientation.pitch)) + # print("detections: " + str(detections)) + # print("") + try: detections = detect_target_to_main_queue.queue.get_nowait() except queue.Empty: @@ -187,10 +228,12 @@ def main() -> int: video_input_to_detect_target_queue.fill_and_drain_queue() detect_target_to_main_queue.fill_and_drain_queue() flight_interface_to_main_queue.fill_and_drain_queue() + data_merge_to_main_queue.fill_and_drain_queue() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() + data_merge_manager.join_workers() cv2.destroyAllWindows() From 136d241342287fcfe173a2e9874e3d135247ee2a Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Fri, 23 Feb 2024 16:57:53 -0500 Subject: [PATCH 4/7] Updated queue names --- main_2024.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/main_2024.py b/main_2024.py index 93ee90f3..8178eee2 100644 --- a/main_2024.py +++ b/main_2024.py @@ -189,7 +189,7 @@ def main() -> int: # print("") try: - detections = detect_target_to_main_queue.queue.get_nowait() + detections = detect_target_to_data_merge_queue.queue.get_nowait() except queue.Empty: detections = None @@ -202,7 +202,7 @@ def main() -> int: print("") odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \ - flight_interface_to_main_queue.queue.get() + flight_interface_to_data_merge_queue.queue.get() if odometry_and_time_info is not None: timestamp = odometry_and_time_info.timestamp @@ -226,8 +226,8 @@ def main() -> int: controller.request_exit() video_input_to_detect_target_queue.fill_and_drain_queue() - detect_target_to_main_queue.fill_and_drain_queue() - flight_interface_to_main_queue.fill_and_drain_queue() + detect_target_to_data_merge_queue.fill_and_drain_queue() + flight_interface_to_data_merge_queue.fill_and_drain_queue() data_merge_to_main_queue.fill_and_drain_queue() video_input_manager.join_workers() From 60453d9f5b1defdbdee692b424cd14f46395240a Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Sat, 24 Feb 2024 19:44:08 -0500 Subject: [PATCH 5/7] Structure should be complete. Just need to adjust main loop --- config.yaml | 3 +++ main_2024.py | 50 ++++++++++++++++++++++++++------------------------ 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/config.yaml b/config.yaml index c5a441b9..11f54fd1 100644 --- a/config.yaml +++ b/config.yaml @@ -19,3 +19,6 @@ flight_interface: address: "tcp:127.0.0.1:14550" timeout: 10.0 # seconds worker_period: 0.1 # seconds + +data_merge: + timeout: 10.0 # seconds diff --git a/main_2024.py b/main_2024.py index 8178eee2..3de38a9c 100644 --- a/main_2024.py +++ b/main_2024.py @@ -168,25 +168,27 @@ def main() -> int: data_merge_manager.start_workers() while True: - # try: - # merged_data = data_merge_to_main_queue.queue.get_nowait() - # except queue.Empty: - # merged_data = None + try: + merged_data = data_merge_to_main_queue.queue.get_nowait() + except queue.Empty: + merged_data = None - # if merged_data is not None: - # timestamp = merged_data.timestamp - # odometry = merged_data.odometry_local - # detections = merged_data.detections - - # print("timestamp: " + str(timestamp)) - # print("north: " + str(odometry.position.north)) - # print("east: " + str(position.east)) - # print("down: " + str(position.down)) - # print("yaw: " + str(orientation.yaw)) - # print("roll: " + str(orientation.roll)) - # print("pitch: " + str(orientation.pitch)) - # print("detections: " + str(detections)) - # print("") + if merged_data is not None: + position = merged_data.odometry_local.position + orientation = merged_data.odometry_local.orientation.orientation + detections = merged_data.detections + + print("MERGED north: " + str(position.north)) + print("MERGED east: " + str(position.east)) + print("MERGED down: " + str(position.down)) + print("MERGED yaw: " + str(orientation.yaw)) + print("MERGED roll: " + str(orientation.roll)) + print("MERGED pitch: " + str(orientation.pitch)) + print("detections: " + str(len(detections))) + for detection in detections: + print(" label: " + str(detection.label)) + print(" confidence: " + str(detection.confidence)) + print("") try: detections = detect_target_to_data_merge_queue.queue.get_nowait() @@ -209,12 +211,12 @@ def main() -> int: position = odometry_and_time_info.odometry_data.position orientation = odometry_and_time_info.odometry_data.orientation.orientation - print("timestamp: " + str(timestamp)) - print("north: " + str(position.north)) - print("east: " + str(position.east)) - print("down: " + str(position.down)) - print("yaw: " + str(orientation.yaw)) - print("roll: " + str(orientation.roll)) + # print("timestamp: " + str(timestamp)) + # print("north: " + str(position.north)) + # print("east: " + str(position.east)) + # print("down: " + str(position.down)) + # print("yaw: " + str(orientation.yaw)) + # print("roll: " + str(orientation.roll)) print("pitch: " + str(orientation.pitch)) print("") From c346f4bc3b8a2daf6cf408bfd101cddb6cccf5a6 Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Sun, 25 Feb 2024 15:27:26 -0500 Subject: [PATCH 6/7] Main loop adjusted and print statments correctly grab object data --- main_2024.py | 56 ++++++++++++++++++++++++++-------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/main_2024.py b/main_2024.py index 3de38a9c..2fa40448 100644 --- a/main_2024.py +++ b/main_2024.py @@ -168,28 +168,6 @@ def main() -> int: data_merge_manager.start_workers() while True: - try: - merged_data = data_merge_to_main_queue.queue.get_nowait() - except queue.Empty: - merged_data = None - - if merged_data is not None: - position = merged_data.odometry_local.position - orientation = merged_data.odometry_local.orientation.orientation - detections = merged_data.detections - - print("MERGED north: " + str(position.north)) - print("MERGED east: " + str(position.east)) - print("MERGED down: " + str(position.down)) - print("MERGED yaw: " + str(orientation.yaw)) - print("MERGED roll: " + str(orientation.roll)) - print("MERGED pitch: " + str(orientation.pitch)) - print("detections: " + str(len(detections))) - for detection in detections: - print(" label: " + str(detection.label)) - print(" confidence: " + str(detection.confidence)) - print("") - try: detections = detect_target_to_data_merge_queue.queue.get_nowait() except queue.Empty: @@ -211,15 +189,37 @@ def main() -> int: position = odometry_and_time_info.odometry_data.position orientation = odometry_and_time_info.odometry_data.orientation.orientation - # print("timestamp: " + str(timestamp)) - # print("north: " + str(position.north)) - # print("east: " + str(position.east)) - # print("down: " + str(position.down)) - # print("yaw: " + str(orientation.yaw)) - # print("roll: " + str(orientation.roll)) + print("timestamp: " + str(timestamp)) + print("north: " + str(position.north)) + print("east: " + str(position.east)) + print("down: " + str(position.down)) + print("yaw: " + str(orientation.yaw)) + print("roll: " + str(orientation.roll)) print("pitch: " + str(orientation.pitch)) print("") + try: + merged_data = data_merge_to_main_queue.queue.get_nowait() + except queue.Empty: + merged_data = None + + if merged_data is not None: + position = merged_data.odometry_local.position + orientation = merged_data.odometry_local.orientation.orientation + detections = merged_data.detections + + print("merged north: " + str(position.north)) + print("merged east: " + str(position.east)) + print("merged down: " + str(position.down)) + print("merged yaw: " + str(orientation.yaw)) + print("merged roll: " + str(orientation.roll)) + print("merged pitch: " + str(orientation.pitch)) + print("merged detections: " + str(len(detections))) + for detection in detections: + print("merged label: " + str(detection.label)) + print("merged confidence: " + str(detection.confidence)) + print("") + if cv2.waitKey(1) == ord('q'): print("Exiting main loop") break From 2f5a15a04e5f11e1cda1345be5edc67f7032c133 Mon Sep 17 00:00:00 2001 From: Dylan Finlay Date: Sun, 25 Feb 2024 18:53:34 -0500 Subject: [PATCH 7/7] All PR comments accounted for --- main_2024.py | 32 +------------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/main_2024.py b/main_2024.py index 2fa40448..17d37306 100644 --- a/main_2024.py +++ b/main_2024.py @@ -168,36 +168,6 @@ def main() -> int: data_merge_manager.start_workers() while True: - try: - detections = detect_target_to_data_merge_queue.queue.get_nowait() - except queue.Empty: - detections = None - - if detections is not None: - print("timestamp: " + str(detections.timestamp)) - print("detections: " + str(len(detections.detections))) - for detection in detections.detections: - print(" label: " + str(detection.label)) - print(" confidence: " + str(detection.confidence)) - print("") - - odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \ - flight_interface_to_data_merge_queue.queue.get() - - if odometry_and_time_info is not None: - timestamp = odometry_and_time_info.timestamp - position = odometry_and_time_info.odometry_data.position - orientation = odometry_and_time_info.odometry_data.orientation.orientation - - print("timestamp: " + str(timestamp)) - print("north: " + str(position.north)) - print("east: " + str(position.east)) - print("down: " + str(position.down)) - print("yaw: " + str(orientation.yaw)) - print("roll: " + str(orientation.roll)) - print("pitch: " + str(orientation.pitch)) - print("") - try: merged_data = data_merge_to_main_queue.queue.get_nowait() except queue.Empty: @@ -214,7 +184,7 @@ def main() -> int: print("merged yaw: " + str(orientation.yaw)) print("merged roll: " + str(orientation.roll)) print("merged pitch: " + str(orientation.pitch)) - print("merged detections: " + str(len(detections))) + print("merged detections count: " + str(len(detections))) for detection in detections: print("merged label: " + str(detection.label)) print("merged confidence: " + str(detection.confidence))