Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ def do_auth(self, origin, event, context, auth_events):
local_view = dict(auth_events)
remote_view = dict(auth_events)
remote_view.update({
(d.type, d.state_key): d for d in different_events
(d.type, d.state_key): d for d in different_events if d
})

new_state, prev_state = self.state_handler.resolve_events(
Expand Down
77 changes: 46 additions & 31 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
)
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn
from synapse.visibility import filter_events_for_client
Expand All @@ -50,6 +50,20 @@ def __init__(self, hs):
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()

self.pagination_lock = ReadWriteLock()

@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)

if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth

with (yield self.pagination_lock.write(room_id)):
yield self.store.delete_old_state(room_id, depth)

@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True):
Expand Down Expand Up @@ -85,42 +99,43 @@ def get_messages(self, requester, room_id=None, pagin_config=None,

source_config = pagin_config.get_source_config("room")

membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)

if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
room_id, room_token.stream
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)

if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
events, next_key = yield data_source.get_pagination_rows(
requester.user, source_config, room_id
)

events, next_key = yield data_source.get_pagination_rows(
requester.user, source_config, room_id
)

next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)

if not events:
defer.returnValue({
Expand Down
19 changes: 19 additions & 0 deletions synapse/rest/client/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ def on_POST(self, request):
defer.returnValue((200, ret))


class PurgeHistoryRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns(
"/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
)

@defer.inlineCallbacks
def on_POST(self, request, room_id, event_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)

if not is_admin:
raise AuthError(403, "You are not a server admin")

yield self.handlers.message_handler.purge_history(room_id, event_id)

defer.returnValue((200, {}))


class DeactivateAccountRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")

Expand Down Expand Up @@ -106,3 +124,4 @@ def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
151 changes: 151 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError

from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
Expand Down Expand Up @@ -1281,6 +1282,156 @@ def get_all_new_events_txn(txn):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)

def delete_old_state(self, room_id, topological_ordering):
return self.runInteraction(
"delete_old_state",
self._delete_old_state_txn, room_id, topological_ordering
)

def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"""Deletes old room state
"""

# Tables that should be pruned:
# event_auth
# event_backward_extremities
# event_content_hashes
# event_destinations
# event_edge_hashes
# event_edges
# event_forward_extremities
# event_json
# event_push_actions
# event_reference_hashes
# event_search
# event_signatures
# event_to_state_groups
# events
# rejections
# room_depth
# state_groups
# state_groups_state

# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
"AND e.room_id = f.room_id "
"WHERE f.room_id = ?",
(room_id,)
)
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)

if max_depth <= topological_ordering:
# We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)

txn.execute(
"SELECT event_id, state_key FROM events"
" LEFT JOIN state_events USING (room_id, event_id)"
" WHERE room_id = ? AND topological_ordering < ?",
(room_id, topological_ordering,)
)
event_rows = txn.fetchall()

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
"SELECT e.event_id FROM events as e"
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events as e2 ON e2.event_id = ed.event_id"
" WHERE e.room_id = ? AND e.topological_ordering < ?"
" AND e2.topological_ordering >= ?",
(room_id, topological_ordering, topological_ordering)
)
new_backwards_extrems = txn.fetchall()

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events"
" INNER JOIN event_to_state_groups USING (event_id)"
" WHERE room_id = ? AND topological_ordering < ?"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering)
)
state_rows = txn.fetchall()
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
state_rows
)
# Delete all non-state
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)

txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
)

# Delete all remote non-state events
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and not self.hs.is_mine_id(event_id)
]
for table in (
"events",
"event_json",
"event_auth",
"event_content_hashes",
"event_destinations",
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
"rejections",
"event_backward_extremities",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)

# Update backward extremeties
txn.executemany(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems]
)

txn.executemany(
"DELETE FROM events WHERE event_id = ?",
to_delete
)
# Mark all state and own events as outliers
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
if state_key is not None or self.hs.is_mine_id(event_id)
]
)


AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,13 @@ def get_topological_token_for_event(self, event_id):
row["topological_ordering"], row["stream_ordering"],)
)

def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
def get_max_topological_token(self, room_id, stream_key):
sql = (
"SELECT max(topological_ordering) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
)
return self._execute(
"get_max_topological_token_for_stream_and_room", None,
"get_max_topological_token", None,
sql, room_id, stream_key,
).addCallback(
lambda r: r[0][0] if r else 0
Expand Down
Loading