Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions synapse/storage/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,17 @@ def _register_user(
(user_id_obj.localpart, create_profile_with_displayname),
)

if self.hs.config.stats_enabled:
# we create a new completed user statistics row

# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is that registration may happen on a different worker than the stats loop. Can we not just insert a new row for a user if we haven't seen them before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will already do that – but this empty delta here is required to mark the row as completed so the stats regenerator doesn't pick it up and we can start collecting historical stats.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can we do the reverse and mark all existing users as needing stats regen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like it could be painful to do. Much like we mark a room as completed on either its stats regen or receiving its creation event (witnessing its creation), the plan was to do the same here — only mark as complete when we witness the user's creation or do a stats regen on that user.

What issue do you see with this current approach?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should just be a case of adding a bg update that adds all existing users to the table with completed set to false?

The issue is that now you're writing to the stats table from multiple places and multiple workers, which is probably fine but means that logic is no longer "this gets updated in one place and that's during the processing loop".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can be done, but it needs consideration — I'll look into it in a bit.

Writes to the stats table will already happen in multiple places (after all, the stats regenerator writes to it too). The upsert logic must ensure that everything is kept consistent ­-- indeed, I have been writing with this in mind, so if it doesn't do that, then that's an issue that should be addressed itself.

Assuming that something is complete, just because it's the first time the incremental processor has seen it, could easily lead to a bug if stats regenerations are performed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was imagining that the stats re-generator ran before we started the normal writing?

Copy link
Copy Markdown
Contributor Author

@reivilibre reivilibre Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats regenerator will run at the same time as the normal writing because blocking normal writing on its completion would lead to a big debt that must be dealt with painfully.

This is why we have 'complete' and 'incomplete' current stats rows.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, OK. Well I guess its fine for now and then we can see how the regen works in a later PR.


self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))

Expand Down Expand Up @@ -1140,6 +1151,7 @@ def validate_threepid_session(self, session_id, client_secret, token, current_ts
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""

# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(
Expand Down
58 changes: 42 additions & 16 deletions synapse/storage/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ def _update_stats_delta_txn(
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""

if absolute_field_overrides is None:
absolute_field_overrides = {}

table, id_col = TYPE_TO_TABLE[stats_type]

quantised_ts = self.quantise_stats_time(int(ts))
Expand Down Expand Up @@ -290,9 +294,6 @@ def _update_stats_delta_txn(
if key not in absolute_field_overrides
}

if absolute_field_overrides is None:
absolute_field_overrides = {}

if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
Expand Down Expand Up @@ -321,10 +322,8 @@ def _update_stats_delta_txn(
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
extra_dst_keyvalues={"end_ts": end_ts},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
Expand Down Expand Up @@ -357,7 +356,7 @@ def _upsert_with_additive_relatives_txn(
]

insert_cols = []
qargs = [table]
qargs = []

for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
Expand All @@ -368,20 +367,21 @@ def _upsert_with_additive_relatives_txn(
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT DO UPDATE SET %(updates)s
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}

txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = chain(absolutes.keys(), additive_relatives.keys())
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
Expand All @@ -400,6 +400,7 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
into_table,
keyvalues,
extra_dst_keyvalues,
extra_dst_insvalues,
additive_relatives,
src_table,
copy_columns,
Expand All @@ -412,6 +413,8 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
extra_dst_insvalues (dict[str, any]): Additional values to insert
on new row creation for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
Expand All @@ -421,18 +424,28 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
keyvalues,
copy_columns,
additive_relatives,
extra_dst_keyvalues,
extra_dst_insvalues,
)
sel_exprs = chain(
keyvalues,
copy_columns,
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
(
"?"
for _ in chain(
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
)
),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)

sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
for f in additive_relatives
)

sql = """
Expand All @@ -455,7 +468,14 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
"additional_where": additional_where,
}

qargs = chain(additive_relatives.values(), keyvalues.values())
qargs = list(
chain(
additive_relatives.values(),
extra_dst_keyvalues.values(),
extra_dst_insvalues.values(),
keyvalues.values(),
)
)
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
Expand All @@ -466,12 +486,18 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
txn,
into_table,
keyvalues,
chain(additive_relatives.keys(), copy_columns),
retcols=list(chain(additive_relatives.keys(), copy_columns)),
allow_none=True,
)

if dest_current_row is None:
merged_dict = {**keyvalues, **src_row, **additive_relatives}
merged_dict = {
**keyvalues,
**extra_dst_keyvalues,
**extra_dst_insvalues,
**src_row,
**additive_relatives,
}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
Expand Down