Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions docs/room_and_user_statistics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
Room and User Statistics
========================

Synapse maintains room and user statistics (as well as a cache of room state),
in various tables.

These can be used for administrative purposes but are also used when generating
the public room directory. If these tables get stale or out of sync (possibly
after database corruption), you may wish to regenerate them.


# Synapse Administrator Documentation

## Various SQL scripts that you may find useful

### Delete stats, including historical stats

```sql
DELETE FROM room_stats_current;
DELETE FROM room_stats_historical;
DELETE FROM user_stats_current;
DELETE FROM user_stats_historical;
```

### Regenerate stats (all subjects)

```sql
BEGIN;
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
COMMIT;

DELETE FROM room_stats_current;
DELETE FROM user_stats_current;
```

then follow the steps below for **'Regenerate stats (missing subjects only)'**

### Regenerate stats (missing subjects only)

```sql
-- Set up staging tables
-- we depend on current_state_events_membership because this is used
-- in our counting.
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_stats_prepare', '{}', 'current_state_events_membership');

-- Run through each room and update stats
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');

-- Run through each user and update stats.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');

-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
```

then **restart Synapse**.


# Synapse Developer Documentation

## High-Level Concepts

### Definitions

* **subject**: Something we are tracking stats about – currently a room or user.
* **current row**: An entry for a subject in the appropriate current statistics
table. Each subject can have only one.
* **historical row**: An entry for a subject in the appropriate historical
statistics table. Each subject can have any number of these.

### Overview

Stats are maintained as time series. There are two kinds of column:

* absolute columns – where the value is correct for the time given by `end_ts`
in the stats row. (Imagine a line graph for these values)
* They can also be thought of as 'gauges' in Prometheus, if you are familiar.
* per-slice columns – where the value corresponds to how many of the occurrences
occurred within the time slice given by `(end_ts − bucket_size)…end_ts`
or `start_ts…end_ts`. (Imagine a histogram for these values)

Currently, only absolute columns are in use.

Stats are maintained in two tables (for each type): current and historical.

Current stats correspond to the present values. Each subject can only have one
entry.

Historical stats correspond to values in the past. Subjects may have multiple
entries.

## Concepts around the management of stats

### current rows

Current rows contain the most up-to-date statistics for a room.
They only contain absolute columns

#### incomplete current rows

There are also **incomplete** current rows, which are current rows that do not
contain a full count yet – this is because they are waiting for the regeneration
process to give them an initial count. Incomplete current rows DO NOT contain
correct and up-to-date values. As such, *incomplete rows are not old-collected*.
Instead, old incomplete rows will be extended so they are no longer old.

### historical rows

Historical rows can always be considered to be valid for the time slice and
end time specified. (This, of course, assumes a lack of defects in the code
to track the statistics, and assumes integrity of the database).

Even still, there are two considerations that we may need to bear in mind:

* historical rows will not exist for every time slice – they will be omitted
if there were no changes. In this case, the following assumptions can be
made to interpolate/recreate missing rows:
- absolute fields have the same values as in the preceding row
- per-slice fields are zero (`0`)
* historical rows will not be retained forever – rows older than a configurable
time will be purged.

#### purge

The purging of historical rows is not yet implemented.

29 changes: 13 additions & 16 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ def _unsafe_process(self):
self.pos["state_delta_stream_id"]
)

logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)

self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
yield self.store.update_stats_positions(self.pos)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
yield self.store.update_stats_positions(self.pos)

event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)

# Then count deltas for total_events and total_event_bytes.
with Measure(self.clock, "stats_total_events_and_bytes"):
Expand All @@ -129,7 +130,6 @@ def _handle_deltas(self, deltas):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

Expand Down Expand Up @@ -158,12 +158,9 @@ def _handle_deltas(self, deltas):
if event:
event_content = event.content or {}

# We use stream_pos here rather than fetch by event_id as event_id
# may be None
stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
stream_pos
)
stream_timestamp = int(stream_timestamp)
# We can't afford for this time to stray into the past, so we count
# it as now.
stream_timestamp = int(self.clock.time_msec())

# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = {}
Expand Down Expand Up @@ -261,7 +258,7 @@ def _handle_deltas(self, deltas):
is_newly_created = True

elif typ == EventTypes.JoinRules:
old_room_state = yield self.store.get_room_state(room_id)
old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
Expand All @@ -282,7 +279,7 @@ def _handle_deltas(self, deltas):
)

elif typ == EventTypes.RoomHistoryVisibility:
old_room_state = yield self.store.get_room_state(room_id)
old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
Expand Down
84 changes: 79 additions & 5 deletions synapse/storage/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,79 @@ def update_room_state(self, room_id, fields):
desc="update_room_state",
)

def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
"""
Get statistics for a given subject.

Args:
stats_type (str): The type of subject
stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.

Returns:
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
return self.runInteraction(
"get_statistics_for_subject",
self._get_statistics_for_subject_txn,
stats_type,
stats_id,
start,
size,
)

def _get_statistics_for_subject_txn(
self, txn, stats_type, stats_id, start, size=100
):
"""
Transaction-bound version of L{get_statistics_for_subject}.
"""

table, id_col = TYPE_TO_TABLE[stats_type]
selected_columns = list(
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
)

slice_list = self._simple_select_list_paginate_txn(
txn,
table + "_historical",
{id_col: stats_id},
"end_ts",
start,
size,
retcols=selected_columns + ["bucket_size", "end_ts"],
order_direction="DESC",
)

return slice_list

def get_room_stats_state(self, room_id):
"""
Returns the current room_stats_state for a room.

Args:
room_id (str): The ID of the room to return state for.

Returns (dict):
Dictionary containing these keys:
"name", "topic", "canonical_alias", "avatar", "join_rules",
"history_visibility"
"""
return self._simple_select_one(
"room_stats_state",
{"room_id": room_id},
retcols=(
"name",
"topic",
"canonical_alias",
"avatar",
"join_rules",
"history_visibility",
),
)

@cached()
def get_earliest_token_for_stats(self, stats_type, id):
"""
Expand Down Expand Up @@ -948,10 +1021,11 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
src_row = self._simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
dest_current_row = self._simple_select_one_txn(
txn,
into_table,
keyvalues,
keyvalues=all_dest_keyvalues,
retcols=list(chain(additive_relatives.keys(), copy_columns)),
allow_none=True,
)
Expand All @@ -968,7 +1042,7 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)
self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)

def incremental_update_room_total_events_and_bytes(self, in_positions):
"""
Expand Down Expand Up @@ -1059,14 +1133,14 @@ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos)
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"

sql = """
SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
FROM events INNER JOIN event_json USING (event_id)
WHERE ? %s stream_ordering AND stream_ordering %s ?
GROUP BY room_id
GROUP BY events.room_id
""" % (
new_bytes_expression,
low_comparator,
high_comparator,
new_bytes_expression,
)

txn.execute(sql, (low_pos, high_pos))
Expand Down
Loading