From 2dcda299e5e2123101b5e4e86fa73f9667057489 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 1 Nov 2018 08:12:39 +1100 Subject: [PATCH 1/5] fix it and tests --- synapse/app/synchrotron.py | 7 ++ tests/rest/client/v2_alpha/test_sync.py | 100 ++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3926c7f2636f..e32b60c6c977 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -236,6 +236,13 @@ def stream_positions(self): return {"typing": self._latest_room_serial} def process_replication_rows(self, token, rows): + if self._latest_room_serial > token: + # The master has gone backwards. To prevent inconsistent data, just + # clear everything. + self._room_serials = {} + self._room_typing = {} + + # Set the latest serial token to whatever the server gave us. self._latest_room_serial = token for row in rows: diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 4c30c5f258f0..07b2c9913035 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -15,6 +15,7 @@ from mock import Mock +from synapse.rest.client.v1 import admin, login, room from synapse.rest.client.v2_alpha import sync from tests import unittest @@ -65,3 +66,102 @@ def test_sync_presence_disabled(self): ["next_batch", "rooms", "account_data", "to_device", "device_lists"] ).issubset(set(channel.json_body.keys())) ) + + +class SyncTypingTests(unittest.HomeserverTestCase): + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + sync.register_servlets + ] + user_id = True + hijack_auth = False + + def test_sync_backwards_typing(self): + + # Register the user who gets notified + user_id = self.register_user("user", "pass") + access_token = self.login("user", "pass") + + # Register the user who sends the message + other_user_id = self.register_user("otheruser", "pass") + other_access_token = self.login("otheruser", "pass") + + # Create a room + room = self.helper.create_room_as(user_id, tok=access_token) + + # Invite the other person + self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id) + + # The other user joins + self.helper.join(room=room, user=other_user_id, tok=other_access_token) + + # The other user sends some messages + self.helper.send(room, body="Hi!", tok=other_access_token) + self.helper.send(room, body="There!", tok=other_access_token) + + request, channel = self.make_request( + "PUT", + "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + b'{"typing": true, "timeout": 30000}', + ) + self.render(request) + self.assertEquals(200, channel.code) + + request, channel = self.make_request("GET", "/sync?access_token=%s" % (access_token,)) + self.render(request) + self.assertEquals(200, channel.code) + next_batch = channel.json_body["next_batch"] + + request, channel = self.make_request( + "PUT", + "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + b'{"typing": false}', + ) + self.render(request) + self.assertEquals(200, channel.code) + + + request, channel = self.make_request( + "PUT", + "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + b'{"typing": true, "timeout": 30000}', + ) + self.render(request) + self.assertEquals(200, channel.code) + + # Should return immediately + request, channel = self.make_request("GET", "/sync?timeout=300000&access_token=%s&since=%s" % (access_token, next_batch)) + self.render(request) + self.assertEquals(200, channel.code) + next_batch = channel.json_body["next_batch"] + + # Reset typing serial back to 0, as if the master had. + typing = self.hs.get_typing_handler() + typing._latest_room_serial = 0 + + # Since it checks the state token, we need some state to update to + # invalidate the stream token. + self.helper.send(room, body="There!", tok=other_access_token) + + request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) + self.render(request) + self.assertEquals(200, channel.code) + next_batch = channel.json_body["next_batch"] + + # This should time out! But it does not, because our stream token is ahead. + request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) + self.render(request) + self.assertEquals(200, channel.code) + next_batch = channel.json_body["next_batch"] + + # Clear the typing information, so that it doesn't think everything is + # in the future. + typing._room_serials = {} + typing._room_typing = {} + + # Now it SHOULD fail as it never completes! + request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) + self.assertRaises(Exception, self.render, request) \ No newline at end of file From 53f9dcc4b6c2bc5705b7c1ccb399f63dc0f1085f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 2 Nov 2018 23:29:30 +1100 Subject: [PATCH 2/5] add a reset --- synapse/app/synchrotron.py | 8 ++++++++ synapse/handlers/typing.py | 14 ++++++++++---- tests/rest/client/v2_alpha/test_sync.py | 3 +-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e32b60c6c977..d71977ba3880 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -226,7 +226,15 @@ def get_currently_syncing_users(self): class SynchrotronTyping(object): def __init__(self, hs): self._latest_room_serial = 0 + self._reset() + + def _reset(self): + """ + Reset the typing handler's data caches. + """ + # map room IDs to serial numbers self._room_serials = {} + # map room IDs to sets of users currently typing self._room_typing = {} def stream_positions(self): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c610933dd474..a61bbf939274 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -63,11 +63,8 @@ def __init__(self, hs): self._member_typing_until = {} # clock time we expect to stop self._member_last_federation_poke = {} - # map room IDs to serial numbers - self._room_serials = {} self._latest_room_serial = 0 - # map room IDs to sets of users currently typing - self._room_typing = {} + self._reset() # caches which room_ids changed at which serials self._typing_stream_change_cache = StreamChangeCache( @@ -79,6 +76,15 @@ def __init__(self, hs): 5000, ) + def _reset(self): + """ + Reset the typing handler's data caches. + """ + # map room IDs to serial numbers + self._room_serials = {} + # map room IDs to sets of users currently typing + self._room_typing = {} + def _handle_timeouts(self): logger.info("Checking for typing timeouts") diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 07b2c9913035..9c02cc06f2e4 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -159,8 +159,7 @@ def test_sync_backwards_typing(self): # Clear the typing information, so that it doesn't think everything is # in the future. - typing._room_serials = {} - typing._room_typing = {} + typing._reset() # Now it SHOULD fail as it never completes! request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) From 2c69208cae1df54e8b475af3ea6fdeb1c3fe8ba2 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 2 Nov 2018 23:37:06 +1100 Subject: [PATCH 3/5] cleanups --- tests/rest/client/v2_alpha/test_sync.py | 48 ++++++++++++++++++------- tests/server.py | 8 ++++- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9c02cc06f2e4..99b716f00aab 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -19,6 +19,7 @@ from synapse.rest.client.v2_alpha import sync from tests import unittest +from tests.server import TimedOutException class FilterTestCase(unittest.HomeserverTestCase): @@ -74,12 +75,20 @@ class SyncTypingTests(unittest.HomeserverTestCase): admin.register_servlets, room.register_servlets, login.register_servlets, - sync.register_servlets + sync.register_servlets, ] user_id = True hijack_auth = False def test_sync_backwards_typing(self): + """ + If the typing serial goes backwards and the typing handler is then reset + (such as when the master restarts and sets the typing serial to 0), we + do not incorrectly return typing information that had a serial greater + than the now-reset serial. + """ + typing_url = "/rooms/%s/typing/%s?access_token=%s" + sync_url = "/sync?timeout=3000000&access_token=%s&since=%s" # Register the user who gets notified user_id = self.register_user("user", "pass") @@ -102,38 +111,44 @@ def test_sync_backwards_typing(self): self.helper.send(room, body="Hi!", tok=other_access_token) self.helper.send(room, body="There!", tok=other_access_token) + # Start typing. request, channel = self.make_request( "PUT", - "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + typing_url % (room, other_user_id, other_access_token), b'{"typing": true, "timeout": 30000}', ) self.render(request) self.assertEquals(200, channel.code) - request, channel = self.make_request("GET", "/sync?access_token=%s" % (access_token,)) + request, channel = self.make_request( + "GET", "/sync?access_token=%s" % (access_token,) + ) self.render(request) self.assertEquals(200, channel.code) next_batch = channel.json_body["next_batch"] + # Stop typing. request, channel = self.make_request( "PUT", - "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + typing_url % (room, other_user_id, other_access_token), b'{"typing": false}', ) self.render(request) self.assertEquals(200, channel.code) - + # Start typing. request, channel = self.make_request( "PUT", - "/rooms/%s/typing/%s?access_token=%s" % (room, other_user_id, other_access_token), + typing_url % (room, other_user_id, other_access_token), b'{"typing": true, "timeout": 30000}', ) self.render(request) self.assertEquals(200, channel.code) # Should return immediately - request, channel = self.make_request("GET", "/sync?timeout=300000&access_token=%s&since=%s" % (access_token, next_batch)) + request, channel = self.make_request( + "GET", sync_url % (access_token, next_batch) + ) self.render(request) self.assertEquals(200, channel.code) next_batch = channel.json_body["next_batch"] @@ -146,13 +161,20 @@ def test_sync_backwards_typing(self): # invalidate the stream token. self.helper.send(room, body="There!", tok=other_access_token) - request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) + request, channel = self.make_request( + "GET", sync_url % (access_token, next_batch) + ) self.render(request) self.assertEquals(200, channel.code) next_batch = channel.json_body["next_batch"] - # This should time out! But it does not, because our stream token is ahead. - request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) + # This should time out! But it does not, because our stream token is + # ahead, and therefore it's saying the typing (that we've actually + # already seen) is new, since it's got a token above our new, now-reset + # stream token. + request, channel = self.make_request( + "GET", sync_url % (access_token, next_batch) + ) self.render(request) self.assertEquals(200, channel.code) next_batch = channel.json_body["next_batch"] @@ -162,5 +184,7 @@ def test_sync_backwards_typing(self): typing._reset() # Now it SHOULD fail as it never completes! - request, channel = self.make_request("GET", "/sync?timeout=3000000&access_token=%s&since=%s" % (access_token, next_batch)) - self.assertRaises(Exception, self.render, request) \ No newline at end of file + request, channel = self.make_request( + "GET", sync_url % (access_token, next_batch) + ) + self.assertRaises(TimedOutException, self.render, request) diff --git a/tests/server.py b/tests/server.py index 819c854448e2..cc6dbe04ac58 100644 --- a/tests/server.py +++ b/tests/server.py @@ -21,6 +21,12 @@ from tests.utils import setup_test_homeserver as _sth +class TimedOutException(Exception): + """ + A web query timed out. + """ + + @attr.s class FakeChannel(object): """ @@ -153,7 +159,7 @@ def wait_until_result(clock, request, timeout=100): x += 1 if x > timeout: - raise Exception("Timed out waiting for request to finish.") + raise TimedOutException("Timed out waiting for request to finish.") clock.advance(0.1) From 4dc6fbaffbd7165d280b76ba1759b56a28169086 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 2 Nov 2018 23:38:32 +1100 Subject: [PATCH 4/5] changelog --- changelog.d/4127.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/4127.bugfix diff --git a/changelog.d/4127.bugfix b/changelog.d/4127.bugfix new file mode 100644 index 000000000000..0701d2ceaac2 --- /dev/null +++ b/changelog.d/4127.bugfix @@ -0,0 +1 @@ +If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new typing events. From 16dc4b1fb5704e67f574450007efc536c7da7502 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 2 Nov 2018 23:39:07 +1100 Subject: [PATCH 5/5] actually use the reset --- synapse/app/synchrotron.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d71977ba3880..0354e82bf850 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -247,8 +247,7 @@ def process_replication_rows(self, token, rows): if self._latest_room_serial > token: # The master has gone backwards. To prevent inconsistent data, just # clear everything. - self._room_serials = {} - self._room_typing = {} + self._reset() # Set the latest serial token to whatever the server gave us. self._latest_room_serial = token