From 58289b7bfa32347de2f0d0bd77ab992e8fdc3a3b Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 21 Aug 2018 05:47:51 +1000 Subject: [PATCH 01/14] port storage stuff to python 3 --- synapse/storage/_base.py | 35 +++++++++++++++++++++++++++++- synapse/storage/deviceinbox.py | 2 +- synapse/storage/devices.py | 10 ++++----- synapse/storage/end_to_end_keys.py | 6 ++--- synapse/storage/events.py | 10 ++++----- synapse/storage/events_worker.py | 11 ++++++---- synapse/storage/filtering.py | 6 ++--- synapse/storage/pusher.py | 14 ++++++++---- synapse/storage/transactions.py | 7 +++--- 9 files changed, 72 insertions(+), 29 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 08dffd774f81..de04767c3fa5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,9 +17,10 @@ import threading import time -from six import iteritems, iterkeys, itervalues +from six import PY2, iteritems, iterkeys, itervalues from six.moves import intern, range +from canonicaljson import json from prometheus_client import Histogram from twisted.internet import defer @@ -1216,3 +1217,35 @@ class _RollbackButIsFineException(Exception): something went wrong. """ pass + + +def db_to_json(db_content): + """ + Take some data from a database row and return JSON. + """ + # psycopg2 on Python 3 returns memoryview objects, which we need to + # cast to bytes to decode + if isinstance(db_content, memoryview): + db_content = db_content.tobytes() + + if PY2 and isinstance(db_content, buffer): + db_content = bytes(db_content) + + if isinstance(db_content, (bytes, bytearray)): + db_content = db_content.decode('utf8') + + try: + return json.loads(db_content) + except Exception: + + try: + if db_content.startswith("\\x7b"): + logging.warning("Detecting mangled JSON, trying to unmangle...") + import binascii + db_content_n = "{" + binascii.unhexlify(db_content[4:]).decode('utf8') + return json.loads(db_content_n) + except Exception: + logging.warning("Failed to unmangle") + + logging.warning("Tried to decode '%r' as JSON and failed", db_content) + raise diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 73646da025c4..e06b0bc56dee 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -169,7 +169,7 @@ def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} - devices = messages_by_device.keys() + devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. sql = ( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c0943ecf912a..d10ff9e4b982 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -24,7 +24,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, SQLBaseStore +from ._base import Cache, SQLBaseStore, db_to_json logger = logging.getLogger(__name__) @@ -411,7 +411,7 @@ def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, if device is not None: key_json = device.get("key_json", None) if key_json: - result["keys"] = json.loads(key_json) + result["keys"] = db_to_json(key_json) device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name @@ -466,7 +466,7 @@ def _get_cached_user_device(self, user_id, device_id): retcol="content", desc="_get_cached_user_device", ) - defer.returnValue(json.loads(content)) + defer.returnValue(db_to_json(content)) @cachedInlineCallbacks() def _get_cached_devices_for_user(self, user_id): @@ -479,7 +479,7 @@ def _get_cached_devices_for_user(self, user_id): desc="_get_cached_devices_for_user", ) defer.returnValue({ - device["device_id"]: json.loads(device["content"]) + device["device_id"]: db_to_json(device["content"]) for device in devices }) @@ -511,7 +511,7 @@ def _get_devices_with_keys_by_user_txn(self, txn, user_id): key_json = device.get("key_json", None) if key_json: - result["keys"] = json.loads(key_json) + result["keys"] = db_to_json(key_json) device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 523b4360c3cb..1f1721e82083 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,13 +14,13 @@ # limitations under the License. from six import iteritems -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.util.caches.descriptors import cached -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json class EndToEndKeyStore(SQLBaseStore): @@ -90,7 +90,7 @@ def get_e2e_device_keys( for user_id, device_keys in iteritems(results): for device_id, device_info in iteritems(device_keys): - device_info["keys"] = json.loads(device_info.pop("key_json")) + device_info["keys"] = db_to_json(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 025a7fb6d979..a83b935c548d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems +from six import iteritems, text_type from six.moves import range from canonicaljson import json @@ -1218,7 +1218,7 @@ def event_dict(event): "sender": event.sender, "contains_url": ( "url" in event.content - and isinstance(event.content["url"], basestring) + and isinstance(event.content["url"], text_type) ), } for event, _ in events_and_contexts @@ -1527,7 +1527,7 @@ def reindex_txn(txn): contains_url = "url" in content if contains_url: - contains_url &= isinstance(content["url"], basestring) + contains_url &= isinstance(content["url"], text_type) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. @@ -1908,9 +1908,9 @@ def _purge_history_txn( (room_id,) ) rows = txn.fetchall() - max_depth = max(row[0] for row in rows) + max_depth = max(row[1] for row in rows) - if max_depth <= token.topological: + if max_depth < token.topological: # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 59822178ff02..bc1b4b98e494 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,6 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import division + import itertools import logging from collections import namedtuple @@ -265,7 +268,7 @@ def _fetch_event_list(self, conn, event_list): """ with Measure(self._clock, "_fetch_event_list"): try: - event_id_lists = zip(*event_list)[0] + event_id_lists = list(zip(*event_list))[0] event_ids = [ item for sublist in event_id_lists for item in sublist ] @@ -299,14 +302,14 @@ def fire(lst, res): logger.exception("do_fetch") # We only want to resolve deferreds from the main thread - def fire(evs): + def fire(evs, exc): for _, d in evs: if not d.called: with PreserveLoggingContext(): - d.errback(e) + d.errback(exc) with PreserveLoggingContext(): - self.hs.get_reactor().callFromThread(fire, event_list) + self.hs.get_reactor().callFromThread(fire, event_list, e) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 2d5896c5b4e1..6ddcc909bfd0 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.api.errors import Codes, SynapseError from synapse.util.caches.descriptors import cachedInlineCallbacks -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json class FilteringStore(SQLBaseStore): @@ -44,7 +44,7 @@ def get_user_filter(self, user_localpart, filter_id): desc="get_user_filter", ) - defer.returnValue(json.loads(bytes(def_json).decode("utf-8"))) + defer.returnValue(db_to_json(def_json)) def add_user_filter(self, user_localpart, user_filter): def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8443bd4c1b70..c7987bfcdd6e 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -15,7 +15,8 @@ # limitations under the License. import logging -import types + +import six from canonicaljson import encode_canonical_json, json @@ -27,6 +28,11 @@ logger = logging.getLogger(__name__) +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class PusherWorkerStore(SQLBaseStore): def _decode_pushers_rows(self, rows): @@ -34,18 +40,18 @@ def _decode_pushers_rows(self, rows): dataJson = r['data'] r['data'] = None try: - if isinstance(dataJson, types.BufferType): + if isinstance(dataJson, db_binary_type): dataJson = str(dataJson).decode("UTF8") r['data'] = json.loads(dataJson) except Exception as e: logger.warn( "Invalid JSON in data for pusher %d: %s, %s", - r['id'], dataJson, e.message, + r['id'], dataJson, e.args[0], ) pass - if isinstance(r['pushkey'], types.BufferType): + if isinstance(r['pushkey'], db_binary_type): r['pushkey'] = str(r['pushkey']).decode("UTF8") return rows diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 428e7fa36e37..0c42bd332227 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -18,14 +18,14 @@ import six -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview @@ -95,7 +95,8 @@ def _get_received_txn_response(self, txn, transaction_id, origin): ) if result and result["response_code"]: - return result["response_code"], json.loads(str(result["response_json"])) + return result["response_code"], db_to_json(result["response_json"]) + else: return None From 7debdd2e0c19ae27a528aa779186e027e4e56371 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 21 Aug 2018 05:58:06 +1000 Subject: [PATCH 02/14] changelog --- changelog.d/3725.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3725.misc diff --git a/changelog.d/3725.misc b/changelog.d/3725.misc new file mode 100644 index 000000000000..91ab9d7137b3 --- /dev/null +++ b/changelog.d/3725.misc @@ -0,0 +1 @@ +The synapse.storage module has been ported to Python 3. From 949d6bfb49b093b30609688deb21a31de7e6414f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 21 Aug 2018 06:23:26 +1000 Subject: [PATCH 03/14] some comments --- synapse/storage/_base.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index de04767c3fa5..dbb25ed1383a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1221,23 +1221,34 @@ class _RollbackButIsFineException(Exception): def db_to_json(db_content): """ - Take some data from a database row and return JSON. + Take some data from a database row and return a JSON-decoded object. + + Args: + db_content (memoryview|buffer|bytes|bytearray|unicode) """ # psycopg2 on Python 3 returns memoryview objects, which we need to # cast to bytes to decode if isinstance(db_content, memoryview): db_content = db_content.tobytes() + # psycopg2 on Python 2 returns buffer objects, which we need to cast to + # bytes to decode if PY2 and isinstance(db_content, buffer): db_content = bytes(db_content) + # Decode it to a Unicode string before feeding it to json.loads, so we + # consistenty get a Unicode-containing object out. if isinstance(db_content, (bytes, bytearray)): db_content = db_content.decode('utf8') try: return json.loads(db_content) except Exception: - + # I don't know how, but somehow some binary objects get mangled upon + # coming out of PostgreSQL on Python 3. The particular mangling is that + # the first byte ("{") is byte-escaped, but the rest is hex-encoded. + # This attempts to unmangle it, and warns about it. If it can't + # unmangle it, it'll complain and then log the problem. try: if db_content.startswith("\\x7b"): logging.warning("Detecting mangled JSON, trying to unmangle...") From c5cc0ca3412e82d2703a3b70985ff583db27ff85 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 20:46:55 +1000 Subject: [PATCH 04/14] review cleanup --- synapse/python_dependencies.py | 3 +++ synapse/storage/_base.py | 14 -------------- synapse/storage/engines/postgres.py | 11 +++++++++++ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 9c55e79ef5fe..dabb563b0b45 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -78,6 +78,9 @@ "affinity": { "affinity": ["affinity"], }, + "postgres": { + "psycopg2>=2.7": ["psycopg2"] + } } diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index dbb25ed1383a..be61147b9b8d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1244,19 +1244,5 @@ def db_to_json(db_content): try: return json.loads(db_content) except Exception: - # I don't know how, but somehow some binary objects get mangled upon - # coming out of PostgreSQL on Python 3. The particular mangling is that - # the first byte ("{") is byte-escaped, but the rest is hex-encoded. - # This attempts to unmangle it, and warns about it. If it can't - # unmangle it, it'll complain and then log the problem. - try: - if db_content.startswith("\\x7b"): - logging.warning("Detecting mangled JSON, trying to unmangle...") - import binascii - db_content_n = "{" + binascii.unhexlify(db_content[4:]).decode('utf8') - return json.loads(db_content_n) - except Exception: - logging.warning("Failed to unmangle") - logging.warning("Tried to decode '%r' as JSON and failed", db_content) raise diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 8a0386c1a41e..a23f35f07770 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings + from ._base import IncorrectDatabaseSetup @@ -21,6 +23,14 @@ class PostgresEngine(object): def __init__(self, database_module, database_config): self.module = database_module + + # Check the version. This will raise if it's less than the supported + # 9.1. + version = database_module.extensions.libpq_version() + + if version < 100000: + warnings.warn("Consider upgrading your client to PostgreSQL 10.0+.") + self.module.extensions.register_type(self.module.extensions.UNICODE) self.synchronous_commit = database_config.get("synchronous_commit", True) @@ -47,6 +57,7 @@ def on_new_connection(self, db_conn): if not self.synchronous_commit: cursor = db_conn.cursor() cursor.execute("SET synchronous_commit TO OFF") + cursor.execute("SET bytea_output TO escape") cursor.close() def is_deadlock(self, error): From dea928d587a89531a116d3df03fce4364c664343 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 20:51:45 +1000 Subject: [PATCH 05/14] remove this second one --- jenkins/prepare_synapse.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh index a30179f2aad7..d95ca846c414 100755 --- a/jenkins/prepare_synapse.sh +++ b/jenkins/prepare_synapse.sh @@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5' $TOX_BIN/pip install 'pip>=10' { python synapse/python_dependencies.py - echo lxml psycopg2 + echo lxml } | xargs $TOX_BIN/pip install From d17ea77ea00b323f416ecadefb354f3f13a470b5 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 20:55:22 +1000 Subject: [PATCH 06/14] fix this --- synapse/storage/engines/postgres.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a23f35f07770..3d0f84617916 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -29,7 +29,10 @@ def __init__(self, database_module, database_config): version = database_module.extensions.libpq_version() if version < 100000: - warnings.warn("Consider upgrading your client to PostgreSQL 10.0+.") + warnings.warn( + "Consider upgrading your client to PostgreSQL 10.0+.", + DeprecationWarning + ) self.module.extensions.register_type(self.module.extensions.UNICODE) self.synchronous_commit = database_config.get("synchronous_commit", True) From b3b28c429bb03d16974e2be6c472130247a9e882 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 21:06:23 +1000 Subject: [PATCH 07/14] fixes --- synapse/storage/events.py | 2 +- synapse/storage/events_worker.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 8bf87f38f7d6..ee21c3fc36ad 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1912,7 +1912,7 @@ def _purge_history_txn( rows = txn.fetchall() max_depth = max(row[1] for row in rows) - if max_depth < token.topological: + if max_depth <= token.topological: # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index bc1b4b98e494..a8326f5296a6 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import division - import itertools import logging from collections import namedtuple From 6a00b5625be4ddf43bf966d196df06e17deb3493 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 21:42:54 +1000 Subject: [PATCH 08/14] fix running bytea escape, thanks krombel --- synapse/storage/engines/postgres.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 3d0f84617916..a6d2101bc415 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -54,14 +54,18 @@ def on_new_connection(self, db_conn): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) + + # Set the bytea output to escape, vs the default of hex + cursor = db_conn.cursor() + cursor.execute("SET bytea_output TO escape") + # Asynchronous commit, don't wait for the server to call fsync before # ending the transaction. # https://www.postgresql.org/docs/current/static/wal-async-commit.html if not self.synchronous_commit: - cursor = db_conn.cursor() cursor.execute("SET synchronous_commit TO OFF") - cursor.execute("SET bytea_output TO escape") - cursor.close() + + cursor.close() def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): From 6dd16ef1df795a58495c6dae4709d4ef1a016d3e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 22:39:22 +1000 Subject: [PATCH 09/14] unit tests --- synapse/storage/events.py | 2 +- tests/rest/client/v1/utils.py | 14 ++++-- tests/storage/test_purge.py | 93 +++++++++++++++++++++++++++++++++++ tests/unittest.py | 11 +++++ 4 files changed, 115 insertions(+), 5 deletions(-) create mode 100644 tests/storage/test_purge.py diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ee21c3fc36ad..8bf87f38f7d6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1912,7 +1912,7 @@ def _purge_history_txn( rows = txn.fetchall() max_depth = max(row[1] for row in rows) - if max_depth <= token.topological: + if max_depth < token.topological: # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 40dc4ea256da..530dc8ba6d92 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -240,7 +240,6 @@ def register(self, user_id): self.assertEquals(200, code) defer.returnValue(response) - @defer.inlineCallbacks def send(self, room_id, body=None, txn_id=None, tok=None, expect_code=200): if txn_id is None: txn_id = "m%s" % (str(time.time())) @@ -248,9 +247,16 @@ def send(self, room_id, body=None, txn_id=None, tok=None, expect_code=200): body = "body_text_here" path = "/_matrix/client/r0/rooms/%s/send/m.room.message/%s" % (room_id, txn_id) - content = '{"msgtype":"m.text","body":"%s"}' % body + content = {"msgtype": "m.text", "body": body} if tok: path = path + "?access_token=%s" % tok - (code, response) = yield self.mock_resource.trigger("PUT", path, content) - self.assertEquals(expect_code, code, msg=str(response)) + request, channel = make_request("PUT", path, json.dumps(content).encode('utf8')) + render(request, self.resource, self.hs.get_reactor()) + + assert int(channel.result["code"]) == expect_code, ( + "Expected: %d, got: %d, resp: %r" + % (expect_code, int(channel.result["code"]), channel.result["body"]) + ) + + return channel.json_body diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py new file mode 100644 index 000000000000..bdf16c04c86b --- /dev/null +++ b/tests/storage/test_purge.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests.unittest import HomeserverTestCase + +from synapse.rest.client.v1 import room + + +class PurgeTests(HomeserverTestCase): + + user_id = "@red:server" + servlets = [room.register_servlets] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver("server", http_client=None) + return hs + + def prepare(self, reactor, clock, hs): + self.room_id = self.helper.create_room_as(self.user_id) + + def test_purge(self): + """ + Purging a room will delete everything before the topological point. + """ + # Send four messages to the room + first = self.helper.send(self.room_id, body="test1") + second = self.helper.send(self.room_id, body="test2") + third = self.helper.send(self.room_id, body="test3") + last = self.helper.send(self.room_id, body="test4") + + storage = self.hs.get_datastore() + + # Get the topological token + event = storage.get_topological_token_for_event(last["event_id"]) + self.pump() + event = self.successResultOf(event) + + # Purge everything before this topological token + purge = storage.purge_history(self.room_id, event, True) + self.pump() + self.assertEqual(self.successResultOf(purge), None) + + # Try and get the events + get_first = storage.get_event(first["event_id"]) + get_second = storage.get_event(second["event_id"]) + get_third = storage.get_event(third["event_id"]) + get_last = storage.get_event(last["event_id"]) + self.pump() + + # 1-3 should fail and last will succeed, meaning that 1-3 are deleted + # and last is not. + self.failureResultOf(get_first) + self.failureResultOf(get_second) + self.failureResultOf(get_third) + self.successResultOf(get_last) + + def test_purge_wont_delete_extrems(self): + """ + Purging a room will delete everything before the topological point. + """ + # Send four messages to the room + first = self.helper.send(self.room_id, body="test1") + second = self.helper.send(self.room_id, body="test2") + third = self.helper.send(self.room_id, body="test3") + last = self.helper.send(self.room_id, body="test4") + + storage = self.hs.get_datastore() + + # Set the topological token higher than it should be + event = storage.get_topological_token_for_event(last["event_id"]) + self.pump() + event = self.successResultOf(event) + event = "t{}-{}".format( + *list(map(lambda x: x + 1, map(int, event[1:].split("-")))) + ) + + # Purge everything before this topological token + purge = storage.purge_history(self.room_id, event, True) + self.pump() + f = self.failureResultOf(purge) + self.assertIn("greater than forward", f.value.args[0]) diff --git a/tests/unittest.py b/tests/unittest.py index d852e2465a06..8b513bb32b28 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -151,6 +151,7 @@ class HomeserverTestCase(TestCase): hijack_auth (bool): Whether to hijack auth to return the user specified in user_id. """ + servlets = [] hijack_auth = True @@ -279,3 +280,13 @@ def setup_test_homeserver(self, *args, **kwargs): kwargs = dict(kwargs) kwargs.update(self._hs_args) return setup_test_homeserver(self.addCleanup, *args, **kwargs) + + def pump(self): + """ + Pump the reactor enough that Deferreds will fire. + """ + self.reactor.pump([0.0] * 100) + + def get_success(self, d): + self.pump() + return self.successResultOf(d) From ef1fb3c535feb7af561576c6aa680fb1ecb14ba6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 23:19:01 +1000 Subject: [PATCH 10/14] isort & cleanups --- tests/storage/test_purge.py | 4 ++-- tests/utils.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index bdf16c04c86b..0f2d63d1988b 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from tests.unittest import HomeserverTestCase - from synapse.rest.client.v1 import room +from tests.unittest import HomeserverTestCase + class PurgeTests(HomeserverTestCase): diff --git a/tests/utils.py b/tests/utils.py index e8ef10445c72..78de026ca7af 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -143,6 +143,8 @@ def setup_test_homeserver( config.max_mau_value = 50 config.mau_limits_reserved_threepids = [] config.admin_contact = None + config.rc_messages_per_second = 10000 + config.rc_message_burst_count = 10000 # we need a sane default_room_version, otherwise attempts to create rooms will # fail. From 05c6872503f3c66449a068073f884a0965b5c269 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 23:20:01 +1000 Subject: [PATCH 11/14] fix other tests --- tests/storage/test_purge.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 0f2d63d1988b..1573f5e4006a 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -91,3 +91,9 @@ def test_purge_wont_delete_extrems(self): self.pump() f = self.failureResultOf(purge) self.assertIn("greater than forward", f.value.args[0]) + + # Nothing is deleted. + self.successResultOf(get_first) + self.successResultOf(get_second) + self.successResultOf(get_third) + self.successResultOf(get_last) From 8dd3c20b0a9e39ebbfad40240aa7ec33b440c38a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 29 Aug 2018 23:27:19 +1000 Subject: [PATCH 12/14] fix for real --- tests/storage/test_purge.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 1573f5e4006a..f671599cb89a 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -92,6 +92,13 @@ def test_purge_wont_delete_extrems(self): f = self.failureResultOf(purge) self.assertIn("greater than forward", f.value.args[0]) + # Try and get the events + get_first = storage.get_event(first["event_id"]) + get_second = storage.get_event(second["event_id"]) + get_third = storage.get_event(third["event_id"]) + get_last = storage.get_event(last["event_id"]) + self.pump() + # Nothing is deleted. self.successResultOf(get_first) self.successResultOf(get_second) From 8c7af46216d89354440b8f137425307b1480c10d Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 30 Aug 2018 23:53:51 +1000 Subject: [PATCH 13/14] remove version check --- synapse/python_dependencies.py | 2 +- synapse/storage/engines/postgres.py | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index dabb563b0b45..942d7c721f0e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -79,7 +79,7 @@ "affinity": ["affinity"], }, "postgres": { - "psycopg2>=2.7": ["psycopg2"] + "psycopg2>=2.6": ["psycopg2"] } } diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a6d2101bc415..0296453f2950 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -23,17 +23,6 @@ class PostgresEngine(object): def __init__(self, database_module, database_config): self.module = database_module - - # Check the version. This will raise if it's less than the supported - # 9.1. - version = database_module.extensions.libpq_version() - - if version < 100000: - warnings.warn( - "Consider upgrading your client to PostgreSQL 10.0+.", - DeprecationWarning - ) - self.module.extensions.register_type(self.module.extensions.UNICODE) self.synchronous_commit = database_config.get("synchronous_commit", True) From 876838ff2fd32357a5d5dc98f43eb47ab475d128 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 31 Aug 2018 00:02:18 +1000 Subject: [PATCH 14/14] pep8 nit --- synapse/storage/engines/postgres.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 0296453f2950..42225f8a2a7e 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import warnings - from ._base import IncorrectDatabaseSetup