From 38aedaf94387d7c40c395bd93acdf822a1227d61 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 3 Sep 2023 06:53:58 +0000 Subject: [PATCH 1/2] add stall mechnism to wait all watches to be delivered before a read-after-write --- faaskeeper/queue.py | 65 +++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/faaskeeper/queue.py b/faaskeeper/queue.py index 57e4816..72dbb01 100644 --- a/faaskeeper/queue.py +++ b/faaskeeper/queue.py @@ -105,24 +105,31 @@ def add_watch_notification(self, result: dict): timestamp = result["timestamp"] hashed_path = hashlib.md5(path.encode()).hexdigest() - # FIXME: check timestamp of event with our watch - # FIXME: Full implementation of different types + with self._watches_lock: existing_watches = self._watches.get(hashed_path) if existing_watches: - for idx, w in enumerate(existing_watches): - if watch_event == WatchEventType.NODE_DATA_CHANGED: - if w.watch_type == WatchType.GET_DATA: - self._queue.put( - ( - EventQueue.EventType.WATCH_NOTIFICATION, - w, - WatchedEvent(watch_event, path, timestamp), - ) + watches_retain = [] + watches_removed_num = 0 + for w in existing_watches: + if w.timestamp < timestamp: + # deliver all prev watches this is partial removal + self._queue.put( + ( + EventQueue.EventType.WATCH_NOTIFICATION, + w, + WatchedEvent(watch_event, path, timestamp), ) - del existing_watches[idx] - return - self._log.warn(f"Ignoring unknown watch notification for even {watch_event} on path {path}") + ) + watches_removed_num += 1 + + else: + watches_retain.append(w) + if watches_removed_num == len(existing_watches): + self._watches.pop(hashed_path, None) + else: + self._watches[hashed_path] = watches_retain + return else: self._log.warn(f"Ignoring unknown watch notification for even {watch_event} on path {path}") @@ -151,23 +158,19 @@ def add_watch(self, path: str, watch: Watch): # get only watches older than timestamp - avoid getting watch that we # just set a moment ago def get_watches(self, paths: List[str], timestamp: int) -> List[Watch]: + # DO not remove from queue, just query from it. if self._closing: raise SessionClosingException() # verify that we don't replace watches - watches = [] + watches_to_deliver = [] with self._watches_lock: for p in paths: existing_watches = self._watches.get(p, []) - watches_removed = 0 for w in existing_watches: if w.timestamp < timestamp: - watches.append(w) - watches_removed += 1 - # FIXME: partial removal - if watches_removed == len(existing_watches): - self._watches.pop(p, None) - return watches + watches_to_deliver.append(w) + return watches_to_deliver def get(self) -> Optional[Tuple]: try: @@ -593,20 +596,18 @@ def run(self): # FIXME - get_children should return the parent (fix implementation!) if result is not None and isinstance(result, Node): timestamp = result.modified.system.sum + + # assume get_watch is a query that does NOT remove watch from watches. watches = self._queue.get_watches([hashlib.md5(result.path.encode()).hexdigest()], timestamp) - # we have watch on ourself - for w in watches: - # FIXME: Move to some library - w.generate_message(WatchedEvent(WatchEventType.NODE_DATA_CHANGED, result.path, timestamp)) - # read watches from epoch paths = [] - # FIXME: hide under abstraction of epoch for p in result.modified.epoch.version: paths.append(p.split("_")[0]) - watches = self._queue.get_watches(paths, timestamp) - # FIXME: stall read + other_watches = watches = self._queue.get_watches(paths, timestamp) + + if (len(watches) > 0 or len(other_watches) > 0): + self._queue.add_direct_result(req_id, result, future) + continue - # FIXME: enforce ordering - watches if isinstance(result, Exception): future.set_exception(result) else: @@ -630,7 +631,7 @@ def run(self): elif submission[0] == EventQueue.EventType.WATCH_NOTIFICATION: # FIXME: ordering - watch = submission[1] + watch:Watch = submission[1] event = submission[2] watch.generate_message(event) From 5a31b82d4acd650b41ca61fa4f814fac299fa88d Mon Sep 17 00:00:00 2001 From: HanayoZz Date: Wed, 13 Sep 2023 13:41:18 -0400 Subject: [PATCH 2/2] fix watches typo in handling direct result. --- faaskeeper/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faaskeeper/queue.py b/faaskeeper/queue.py index 72dbb01..2c08b5b 100644 --- a/faaskeeper/queue.py +++ b/faaskeeper/queue.py @@ -602,7 +602,7 @@ def run(self): paths = [] for p in result.modified.epoch.version: paths.append(p.split("_")[0]) - other_watches = watches = self._queue.get_watches(paths, timestamp) + other_watches = self._queue.get_watches(paths, timestamp) if (len(watches) > 0 or len(other_watches) > 0): self._queue.add_direct_result(req_id, result, future)