Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions faaskeeper/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,31 @@ def add_watch_notification(self, result: dict):
timestamp = result["timestamp"]

hashed_path = hashlib.md5(path.encode()).hexdigest()
# FIXME: check timestamp of event with our watch
# FIXME: Full implementation of different types

with self._watches_lock:
existing_watches = self._watches.get(hashed_path)
if existing_watches:
for idx, w in enumerate(existing_watches):
if watch_event == WatchEventType.NODE_DATA_CHANGED:
if w.watch_type == WatchType.GET_DATA:
self._queue.put(
(
EventQueue.EventType.WATCH_NOTIFICATION,
w,
WatchedEvent(watch_event, path, timestamp),
)
watches_retain = []
watches_removed_num = 0
for w in existing_watches:
if w.timestamp < timestamp:
# deliver all prev watches this is partial removal
self._queue.put(
(
EventQueue.EventType.WATCH_NOTIFICATION,
w,
WatchedEvent(watch_event, path, timestamp),
)
del existing_watches[idx]
return
self._log.warn(f"Ignoring unknown watch notification for even {watch_event} on path {path}")
)
watches_removed_num += 1

else:
watches_retain.append(w)
if watches_removed_num == len(existing_watches):
self._watches.pop(hashed_path, None)
else:
self._watches[hashed_path] = watches_retain
return
else:
self._log.warn(f"Ignoring unknown watch notification for even {watch_event} on path {path}")

Expand Down Expand Up @@ -151,23 +158,19 @@ def add_watch(self, path: str, watch: Watch):
# get only watches older than timestamp - avoid getting watch that we
# just set a moment ago
def get_watches(self, paths: List[str], timestamp: int) -> List[Watch]:
# DO not remove from queue, just query from it.
if self._closing:
raise SessionClosingException()

# verify that we don't replace watches
watches = []
watches_to_deliver = []
with self._watches_lock:
for p in paths:
existing_watches = self._watches.get(p, [])
watches_removed = 0
for w in existing_watches:
if w.timestamp < timestamp:
watches.append(w)
watches_removed += 1
# FIXME: partial removal
if watches_removed == len(existing_watches):
self._watches.pop(p, None)
return watches
watches_to_deliver.append(w)
return watches_to_deliver

def get(self) -> Optional[Tuple]:
try:
Expand Down Expand Up @@ -593,20 +596,18 @@ def run(self):
# FIXME - get_children should return the parent (fix implementation!)
if result is not None and isinstance(result, Node):
timestamp = result.modified.system.sum

# assume get_watch is a query that does NOT remove watch from watches.
watches = self._queue.get_watches([hashlib.md5(result.path.encode()).hexdigest()], timestamp)
# we have watch on ourself
for w in watches:
# FIXME: Move to some library
w.generate_message(WatchedEvent(WatchEventType.NODE_DATA_CHANGED, result.path, timestamp))
# read watches from epoch
paths = []
# FIXME: hide under abstraction of epoch
for p in result.modified.epoch.version:
paths.append(p.split("_")[0])
watches = self._queue.get_watches(paths, timestamp)
# FIXME: stall read
other_watches = self._queue.get_watches(paths, timestamp)

if (len(watches) > 0 or len(other_watches) > 0):
self._queue.add_direct_result(req_id, result, future)
continue

# FIXME: enforce ordering - watches
if isinstance(result, Exception):
future.set_exception(result)
else:
Expand All @@ -630,7 +631,7 @@ def run(self):
elif submission[0] == EventQueue.EventType.WATCH_NOTIFICATION:

# FIXME: ordering
watch = submission[1]
watch:Watch = submission[1]
event = submission[2]

watch.generate_message(event)
Expand Down