Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6245.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Split out state storage into separate data store.
8 changes: 4 additions & 4 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ from synapse.storage.data_stores.main.registration import (
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
from synapse.storage.data_stores.main.state import StateBackgroundUpdateStore
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
Expand Down Expand Up @@ -138,6 +139,7 @@ class Store(
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
MainStateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
Expand Down Expand Up @@ -496,9 +498,7 @@ class Porter(object):
def run(self):
try:
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig(
"master", self.sqlite_config, data_stores=["main"]
)
DatabaseConnectionConfig("master-sqlite", self.sqlite_config)
)

# Check if all background updates are done, abort if not.
Expand Down
10 changes: 5 additions & 5 deletions synapse/config/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class DatabaseConnectionConfig:
module name, and `args` for the args to give to the database
connector.
data_stores: The list of data stores that should be provisioned on the
database.
database. Defaults to all data stores.
"""

def __init__(self, name: str, db_config: dict, data_stores: List[str]):
def __init__(
self, name: str, db_config: dict, data_stores: List[str] = ["main", "state"]
):
if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],))

Expand All @@ -62,9 +64,7 @@ def read_config(self, config, **kwargs):
if database_config is None:
database_config = {"name": "sqlite3", "args": {}}

self.databases = [
DatabaseConnectionConfig("master", database_config, data_stores=["main"])
]
self.databases = [DatabaseConnectionConfig("master", database_config)]

self.set_databasepath(config.get("database_path"))

Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/data_stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging

from synapse.storage.data_stores.state import StateGroupDataStore
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
Expand Down Expand Up @@ -55,6 +56,10 @@ def __init__(self, main_store_class, hs):
logger.info("Starting 'main' data store")
self.main = main_store_class(database, db_conn, hs)

if "state" in database_config.data_stores:
logger.info("Starting 'state' data store")
self.state = StateGroupDataStore(database, db_conn, hs)

db_conn.commit()

self.databases.append(database)
Expand Down
157 changes: 0 additions & 157 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1757,163 +1757,6 @@ def _purge_room_txn(self, txn, room_id):

return state_groups

def purge_unreferenced_state_groups(
self, room_id: str, state_groups_to_delete
) -> defer.Deferred:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.

Args:
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete (Collection[int]): Set of all state groups
to delete.
"""

return self.db.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
state_groups_to_delete,
)

def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)

rows = self.db.simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=state_groups_to_delete,
keyvalues={},
retcols=("state_group",),
)

remaining_state_groups = set(
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
)

logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)

# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]

self.db.simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)

self.db.simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)

self.db.simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)

logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)

@defer.inlineCallbacks
def get_previous_state_groups(self, state_groups):
"""Fetch the previous groups of the given state groups.

Args:
state_groups (Iterable[int])

Returns:
Deferred[dict[int, int]]: mapping from state group to previous
state group.
"""

rows = yield self.db.simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
keyvalues={},
retcols=("prev_state_group", "state_group"),
desc="get_previous_state_groups",
)

return {row["state_group"]: row["prev_state_group"] for row in rows}

def purge_room_state(self, room_id, state_groups_to_delete):
"""Deletes all record of a room from state tables

Args:
room_id (str):
state_groups_to_delete (list[int]): State groups to delete
"""

return self.db.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
state_groups_to_delete,
)

def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)

self.db.simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)

# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)

self.db.simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)

# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)

self.db.simple_delete_many_txn(
txn,
table="state_groups",
column="id",
iterable=state_groups_to_delete,
keyvalues={},
)

async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,40 +975,6 @@ CREATE TABLE state_events (



CREATE TABLE state_group_edges (
state_group bigint NOT NULL,
prev_state_group bigint NOT NULL
);



CREATE SEQUENCE state_group_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;



CREATE TABLE state_groups (
id bigint NOT NULL,
room_id text NOT NULL,
event_id text NOT NULL
);



CREATE TABLE state_groups_state (
state_group bigint NOT NULL,
room_id text NOT NULL,
type text NOT NULL,
state_key text NOT NULL,
event_id text NOT NULL
);



CREATE TABLE stats_stream_pos (
lock character(1) DEFAULT 'X'::bpchar NOT NULL,
stream_id bigint,
Expand Down Expand Up @@ -1482,12 +1448,6 @@ ALTER TABLE ONLY state_events
ADD CONSTRAINT state_events_event_id_key UNIQUE (event_id);



ALTER TABLE ONLY state_groups
ADD CONSTRAINT state_groups_pkey PRIMARY KEY (id);



ALTER TABLE ONLY stats_stream_pos
ADD CONSTRAINT stats_stream_pos_lock_key UNIQUE (lock);

Expand Down Expand Up @@ -1928,18 +1888,6 @@ CREATE UNIQUE INDEX room_stats_room_ts ON room_stats USING btree (room_id, ts);



CREATE INDEX state_group_edges_idx ON state_group_edges USING btree (state_group);



CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_state_group);



CREATE INDEX state_groups_state_type_idx ON state_groups_state USING btree (state_group, type, state_key);



CREATE INDEX stream_ordering_to_exterm_idx ON stream_ordering_to_exterm USING btree (stream_ordering);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ CREATE INDEX ev_edges_id ON event_edges(event_id);
CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
CREATE TABLE room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) );
CREATE INDEX room_depth_room ON room_depth(room_id);
CREATE TABLE state_groups( id BIGINT PRIMARY KEY, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE TABLE state_groups_state( state_group BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE TABLE event_to_state_groups( event_id TEXT NOT NULL, state_group BIGINT NOT NULL, UNIQUE (event_id) );
CREATE TABLE local_media_repository ( media_id TEXT, media_type TEXT, media_length INTEGER, created_ts BIGINT, upload_name TEXT, user_id TEXT, quarantined_by TEXT, url_cache TEXT, last_access_ts BIGINT, UNIQUE (media_id) );
CREATE TABLE local_media_repository_thumbnails ( media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_type TEXT, thumbnail_method TEXT, thumbnail_length INTEGER, UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) );
Expand Down Expand Up @@ -120,9 +118,6 @@ CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL );
CREATE TABLE public_room_list_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, visibility BOOLEAN NOT NULL , appservice_id TEXT, network_id TEXT);
CREATE INDEX public_room_list_stream_idx on public_room_list_stream( stream_id );
CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( room_id, stream_id );
CREATE TABLE state_group_edges( state_group BIGINT NOT NULL, prev_state_group BIGINT NOT NULL );
CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group);
CREATE TABLE stream_ordering_to_exterm ( stream_ordering BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( stream_ordering );
CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( room_id, stream_ordering );
Expand Down Expand Up @@ -254,6 +249,5 @@ CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen);
CREATE INDEX users_creation_ts ON users (creation_ts);
CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group);
CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id);
CREATE INDEX state_groups_state_type_idx ON state_groups_state(state_group, type, state_key);
CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id);
CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip);
Loading