Merge remote-tracking branch 'origin/master' into shhs

This commit is contained in:
Amber Brown 2019-06-04 20:41:59 +10:00
commit e7c1171935
35 changed files with 836 additions and 211 deletions

View file

@ -1,3 +1,62 @@
Synapse 0.99.5.2 (2019-05-30)
=============================
Bugfixes
--------
- Fix bug where we leaked extremities when we soft failed events, leading to performance degradation. ([\#5274](https://github.com/matrix-org/synapse/issues/5274), [\#5278](https://github.com/matrix-org/synapse/issues/5278), [\#5291](https://github.com/matrix-org/synapse/issues/5291))
Synapse 0.99.5.1 (2019-05-22)
=============================
0.99.5.1 supersedes 0.99.5 due to malformed debian changelog - no functional changes.
Synapse 0.99.5 (2019-05-22)
===========================
No significant changes.
Synapse 0.99.5rc1 (2019-05-21)
==============================
Features
--------
- Add ability to blacklist IP ranges for the federation client. ([\#5043](https://github.com/matrix-org/synapse/issues/5043))
- Ratelimiting configuration for clients sending messages and the federation server has been altered to match login ratelimiting. The old configuration names will continue working. Check the sample config for details of the new names. ([\#5181](https://github.com/matrix-org/synapse/issues/5181))
- Drop support for the undocumented /_matrix/client/v2_alpha API prefix. ([\#5190](https://github.com/matrix-org/synapse/issues/5190))
- Add an option to disable per-room profiles. ([\#5196](https://github.com/matrix-org/synapse/issues/5196))
- Stick an expiration date to any registered user missing one at startup if account validity is enabled. ([\#5204](https://github.com/matrix-org/synapse/issues/5204))
- Add experimental support for relations (aka reactions and edits). ([\#5209](https://github.com/matrix-org/synapse/issues/5209), [\#5211](https://github.com/matrix-org/synapse/issues/5211), [\#5203](https://github.com/matrix-org/synapse/issues/5203), [\#5212](https://github.com/matrix-org/synapse/issues/5212))
- Add a room version 4 which uses a new event ID format, as per [MSC2002](https://github.com/matrix-org/matrix-doc/pull/2002). ([\#5210](https://github.com/matrix-org/synapse/issues/5210), [\#5217](https://github.com/matrix-org/synapse/issues/5217))
Bugfixes
--------
- Fix image orientation when generating thumbnails (needs pillow>=4.3.0). Contributed by Pau Rodriguez-Estivill. ([\#5039](https://github.com/matrix-org/synapse/issues/5039))
- Exclude soft-failed events from forward-extremity candidates: fixes "No forward extremities left!" error. ([\#5146](https://github.com/matrix-org/synapse/issues/5146))
- Re-order stages in registration flows such that msisdn and email verification are done last. ([\#5174](https://github.com/matrix-org/synapse/issues/5174))
- Fix 3pid guest invites. ([\#5177](https://github.com/matrix-org/synapse/issues/5177))
- Fix a bug where the register endpoint would fail with M_THREEPID_IN_USE instead of returning an account previously registered in the same session. ([\#5187](https://github.com/matrix-org/synapse/issues/5187))
- Prevent registration for user ids that are too long to fit into a state key. Contributed by Reid Anderson. ([\#5198](https://github.com/matrix-org/synapse/issues/5198))
- Fix incompatibility between ACME support and Python 3.5.2. ([\#5218](https://github.com/matrix-org/synapse/issues/5218))
- Fix error handling for rooms whose versions are unknown. ([\#5219](https://github.com/matrix-org/synapse/issues/5219))
Internal Changes
----------------
- Make /sync attempt to return device updates for both joined and invited users. Note that this doesn't currently work correctly due to other bugs. ([\#3484](https://github.com/matrix-org/synapse/issues/3484))
- Update tests to consistently be configured via the same code that is used when loading from configuration files. ([\#5171](https://github.com/matrix-org/synapse/issues/5171), [\#5185](https://github.com/matrix-org/synapse/issues/5185))
- Allow client event serialization to be async. ([\#5183](https://github.com/matrix-org/synapse/issues/5183))
- Expose DataStore._get_events as get_events_as_list. ([\#5184](https://github.com/matrix-org/synapse/issues/5184))
- Make generating SQL bounds for pagination generic. ([\#5191](https://github.com/matrix-org/synapse/issues/5191))
- Stop telling people to install the optional dependencies by default. ([\#5197](https://github.com/matrix-org/synapse/issues/5197))
Synapse 0.99.4 (2019-05-15) Synapse 0.99.4 (2019-05-15)
=========================== ===========================

View file

@ -1 +0,0 @@
Make /sync attempt to return device updates for both joined and invited users. Note that this doesn't currently work correctly due to other bugs.

View file

@ -1 +0,0 @@
Fix image orientation when generating thumbnails (needs pillow>=4.3.0). Contributed by Pau Rodriguez-Estivill.

View file

@ -1 +0,0 @@
Add ability to blacklist IP ranges for the federation client.

View file

@ -1 +0,0 @@
Exclude soft-failed events from forward-extremity candidates: fixes "No forward extremities left!" error.

View file

@ -1 +0,0 @@
Update tests to consistently be configured via the same code that is used when loading from configuration files.

View file

@ -1 +0,0 @@
Re-order stages in registration flows such that msisdn and email verification are done last.

View file

@ -1 +0,0 @@
Fix 3pid guest invites.

View file

@ -1 +0,0 @@
Ratelimiting configuration for clients sending messages and the federation server has been altered to match login ratelimiting. The old configuration names will continue working. Check the sample config for details of the new names.

View file

@ -1 +0,0 @@
Allow client event serialization to be async.

View file

@ -1 +0,0 @@
Expose DataStore._get_events as get_events_as_list.

View file

@ -1 +0,0 @@
Update tests to consistently be configured via the same code that is used when loading from configuration files.

View file

@ -1 +0,0 @@
Fix a bug where the register endpoint would fail with M_THREEPID_IN_USE instead of returning an account previously registered in the same session.

View file

@ -1 +0,0 @@
Drop support for the undocumented /_matrix/client/v2_alpha API prefix.

View file

@ -1 +0,0 @@
Make generating SQL bounds for pagination generic.

View file

@ -1 +0,0 @@
Add an option to disable per-room profiles.

View file

@ -1 +0,0 @@
Stop telling people to install the optional dependencies by default.

View file

@ -1 +0,0 @@
Prevent registration for user ids that are to long to fit into a state key. Contributed by Reid Anderson.

View file

@ -1 +0,0 @@
Add experimental support for relations (aka reactions and edits).

View file

@ -1 +0,0 @@
Stick an expiration date to any registered user missing one at startup if account validity is enabled.

View file

@ -1 +0,0 @@
Add experimental support for relations (aka reactions and edits).

View file

@ -1 +0,0 @@
Add a room version 4 which uses a new event ID format, as per [MSC2002](https://github.com/matrix-org/matrix-doc/pull/2002).

View file

@ -1 +0,0 @@
Add experimental support for relations (aka reactions and edits).

View file

@ -1 +0,0 @@
Add experimental support for relations (aka reactions and edits).

View file

@ -1 +0,0 @@
Add a room version 4 which uses a new event ID format, as per [MSC2002](https://github.com/matrix-org/matrix-doc/pull/2002).

View file

@ -1 +0,0 @@
Fix incompatibility between ACME support and Python 3.5.2.

View file

@ -1 +0,0 @@
Fix error handling for rooms whose versions are unknown.

12
debian/changelog vendored
View file

@ -1,3 +1,15 @@
matrix-synapse-py3 (0.99.5.2) stable; urgency=medium
* New synapse release 0.99.5.2.
-- Synapse Packaging team <packages@matrix.org> Thu, 30 May 2019 16:28:07 +0100
matrix-synapse-py3 (0.99.5.1) stable; urgency=medium
* New synapse release 0.99.5.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 22 May 2019 16:22:24 +0000
matrix-synapse-py3 (0.99.4) stable; urgency=medium matrix-synapse-py3 (0.99.4) stable; urgency=medium
[ Christoph Müller ] [ Christoph Müller ]

View file

@ -27,4 +27,4 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "0.99.4" __version__ = "0.99.5.2"

View file

@ -36,6 +36,7 @@ from .engines import PostgresEngine
from .event_federation import EventFederationStore from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore from .event_push_actions import EventPushActionsStore
from .events import EventsStore from .events import EventsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .filtering import FilteringStore from .filtering import FilteringStore
from .group_server import GroupServerStore from .group_server import GroupServerStore
from .keys import KeyStore from .keys import KeyStore
@ -66,6 +67,7 @@ logger = logging.getLogger(__name__)
class DataStore( class DataStore(
EventsBackgroundUpdatesStore,
RoomMemberStore, RoomMemberStore,
RoomStore, RoomStore,
RegistrationStore, RegistrationStore,

View file

@ -1261,7 +1261,8 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues), " AND ".join("%s = ?" % (k,) for k in keyvalues),
) )
return txn.execute(sql, list(keyvalues.values())) txn.execute(sql, list(keyvalues.values()))
return txn.rowcount
def _simple_delete_many(self, table, column, iterable, keyvalues, desc): def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction( return self.runInteraction(
@ -1280,9 +1281,12 @@ class SQLBaseStore(object):
column : column name to test for inclusion against `iterable` column : column name to test for inclusion against `iterable`
iterable : list iterable : list
keyvalues : dict of column names and values to select the rows with keyvalues : dict of column names and values to select the rows with
Returns:
int: Number rows deleted
""" """
if not iterable: if not iterable:
return return 0
sql = "DELETE FROM %s" % table sql = "DELETE FROM %s" % table
@ -1297,7 +1301,9 @@ class SQLBaseStore(object):
if clauses: if clauses:
sql = "%s WHERE %s" % (sql, " AND ".join(clauses)) sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
return txn.execute(sql, values) txn.execute(sql, values)
return txn.rowcount
def _get_cache_dict( def _get_cache_dict(
self, db_conn, table, entity_column, stream_column, max_value, limit=100000 self, db_conn, table, entity_column, stream_column, max_value, limit=100000

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd # Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -219,41 +220,11 @@ class EventsStore(
EventsWorkerStore, EventsWorkerStore,
BackgroundUpdateStore, BackgroundUpdateStore,
): ):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs) super(EventsStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
self.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)
self.register_background_index_update(
"event_contains_url_index",
index_name="event_contains_url_index",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering"],
where_clause="contains_url = true AND outlier = false",
)
# an event_id index on event_search is useful for the purge_history
# api. Plus it means we get to enforce some integrity with a UNIQUE
# clause
self.register_background_index_update(
"event_search_event_id_idx",
index_name="event_search_event_id_idx",
table="event_search",
columns=["event_id"],
unique=True,
psql_only=True,
)
self._event_persist_queue = _EventPeristenceQueue() self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler() self._state_resolution_handler = hs.get_state_resolution_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
@ -554,10 +525,18 @@ class EventsStore(
e_id for event in new_events for e_id in event.prev_event_ids() e_id for event in new_events for e_id in event.prev_event_ids()
) )
# Finally, remove any events which are prev_events of any existing events. # Remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result) existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs) result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
# events. If they do we need to remove them and their prev events,
# otherwise we end up with dangling extremities.
existing_prevs = yield self._get_prevs_before_rejected(
e_id for event in new_events for e_id in event.prev_event_ids()
)
result.difference_update(existing_prevs)
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -573,7 +552,7 @@ class EventsStore(
""" """
results = [] results = []
def _get_events(txn, batch): def _get_events_which_are_prevs_txn(txn, batch):
sql = """ sql = """
SELECT prev_event_id, internal_metadata SELECT prev_event_id, internal_metadata
FROM event_edges FROM event_edges
@ -596,10 +575,78 @@ class EventsStore(
) )
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100):
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk) yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events_which_are_prevs_txn,
chunk,
)
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks
def _get_prevs_before_rejected(self, event_ids):
"""Get soft-failed ancestors to remove from the extremities.
Given a set of events, find all those that have been soft-failed or
rejected. Returns those soft failed/rejected events and their prev
events (whether soft-failed/rejected or not), and recurses up the
prev-event graph until it finds no more soft-failed/rejected events.
This is used to find extremities that are ancestors of new events, but
are separated by soft failed events.
Args:
event_ids (Iterable[str]): Events to find prev events for. Note
that these must have already been persisted.
Returns:
Deferred[set[str]]
"""
# The set of event_ids to return. This includes all soft-failed events
# and their prev events.
existing_prevs = set()
def _get_prevs_before_rejected_txn(txn, batch):
to_recursively_check = batch
while to_recursively_check:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
rejections.event_id IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
event_id IN (%s)
AND NOT events.outlier
""" % (
",".join("?" for _ in to_recursively_check),
)
txn.execute(sql, to_recursively_check)
to_recursively_check = []
for event_id, prev_event_id, metadata, rejected in txn:
if prev_event_id in existing_prevs:
continue
soft_failed = json.loads(metadata).get("soft_failed")
if soft_failed or rejected:
to_recursively_check.append(prev_event_id)
existing_prevs.add(prev_event_id)
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_prevs_before_rejected",
_get_prevs_before_rejected_txn,
chunk,
)
defer.returnValue(existing_prevs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_new_state_after_events( def _get_new_state_after_events(
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
@ -1503,153 +1550,6 @@ class EventsStore(
ret = yield self.runInteraction("count_daily_active_rooms", _count) ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
" INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
min_stream_id = rows[-1][0]
update_rows = []
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
sender = event_json["sender"]
content = event_json["content"]
contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
update_rows.append((sender, contains_url, event_id))
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows),
}
self._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)
return len(rows)
result = yield self.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)
if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
defer.returnValue(result)
@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
rows_to_update = []
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)
for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
rows_to_update.append((origin_server_ts, event_id))
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows_to_update),
}
self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)
return len(rows_to_update)
result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)
if not result:
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
defer.returnValue(result)
def get_current_backfill_token(self): def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached""" """The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token() return -self._backfill_id_gen.get_current_token()

View file

@ -0,0 +1,401 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six import text_type
from canonicaljson import json
from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
def __init__(self, db_conn, hs):
super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
self.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)
self.register_background_index_update(
"event_contains_url_index",
index_name="event_contains_url_index",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering"],
where_clause="contains_url = true AND outlier = false",
)
# an event_id index on event_search is useful for the purge_history
# api. Plus it means we get to enforce some integrity with a UNIQUE
# clause
self.register_background_index_update(
"event_search_event_id_idx",
index_name="event_search_event_id_idx",
table="event_search",
columns=["event_id"],
unique=True,
psql_only=True,
)
self.register_background_update_handler(
self.DELETE_SOFT_FAILED_EXTREMITIES,
self._cleanup_extremities_bg_update,
)
@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
" INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
min_stream_id = rows[-1][0]
update_rows = []
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
sender = event_json["sender"]
content = event_json["content"]
contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
update_rows.append((sender, contains_url, event_id))
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows),
}
self._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)
return len(rows)
result = yield self.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)
if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
defer.returnValue(result)
@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
rows_to_update = []
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)
for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
rows_to_update.append((origin_server_ts, event_id))
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows_to_update),
}
self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)
return len(rows_to_update)
result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)
if not result:
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
defer.returnValue(result)
@defer.inlineCallbacks
def _cleanup_extremities_bg_update(self, progress, batch_size):
"""Background update to clean out extremities that should have been
deleted previously.
Mainly used to deal with the aftermath of #5269.
"""
# This works by first copying all existing forward extremities into the
# `_extremities_to_check` table at start up, and then checking each
# event in that table whether we have any descendants that are not
# soft-failed/rejected. If that is the case then we delete that event
# from the forward extremities table.
#
# For efficiency, we do this in batches by recursively pulling out all
# descendants of a batch until we find the non soft-failed/rejected
# events, i.e. the set of descendants whose chain of prev events back
# to the batch of extremities are all soft-failed or rejected.
# Typically, we won't find any such events as extremities will rarely
# have any descendants, but if they do then we should delete those
# extremities.
def _cleanup_extremities_bg_update_txn(txn):
# The set of extremity event IDs that we're checking this round
original_set = set()
# A dict[str, set[str]] of event ID to their prev events.
graph = {}
# The set of descendants of the original set that are not rejected
# nor soft-failed. Ancestors of these events should be removed
# from the forward extremities table.
non_rejected_leaves = set()
# Set of event IDs that have been soft failed, and for which we
# should check if they have descendants which haven't been soft
# failed.
soft_failed_events_to_lookup = set()
# First, we get `batch_size` events from the table, pulling out
# their successor events, if any, and the successor events'
# rejection status.
txn.execute(
"""SELECT prev_event_id, event_id, internal_metadata,
rejections.event_id IS NOT NULL, events.outlier
FROM (
SELECT event_id AS prev_event_id
FROM _extremities_to_check
LIMIT ?
) AS f
LEFT JOIN event_edges USING (prev_event_id)
LEFT JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
LEFT JOIN rejections USING (event_id)
""", (batch_size,)
)
for prev_event_id, event_id, metadata, rejected, outlier in txn:
original_set.add(prev_event_id)
if not event_id or outlier:
# Common case where the forward extremity doesn't have any
# descendants.
continue
graph.setdefault(event_id, set()).add(prev_event_id)
soft_failed = False
if metadata:
soft_failed = json.loads(metadata).get("soft_failed")
if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id)
else:
non_rejected_leaves.add(event_id)
# Now we recursively check all the soft-failed descendants we
# found above in the same way, until we have nothing left to
# check.
while soft_failed_events_to_lookup:
# We only want to do 100 at a time, so we split given list
# into two.
batch = list(soft_failed_events_to_lookup)
to_check, to_defer = batch[:100], batch[100:]
soft_failed_events_to_lookup = set(to_defer)
sql = """SELECT prev_event_id, event_id, internal_metadata,
rejections.event_id IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
INNER JOIN event_json USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE
prev_event_id IN (%s)
AND NOT events.outlier
""" % (
",".join("?" for _ in to_check),
)
txn.execute(sql, to_check)
for prev_event_id, event_id, metadata, rejected in txn:
if event_id in graph:
# Already handled this event previously, but we still
# want to record the edge.
graph[event_id].add(prev_event_id)
continue
graph[event_id] = {prev_event_id}
soft_failed = json.loads(metadata).get("soft_failed")
if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id)
else:
non_rejected_leaves.add(event_id)
# We have a set of non-soft-failed descendants, so we recurse up
# the graph to find all ancestors and add them to the set of event
# IDs that we can delete from forward extremities table.
to_delete = set()
while non_rejected_leaves:
event_id = non_rejected_leaves.pop()
prev_event_ids = graph.get(event_id, set())
non_rejected_leaves.update(prev_event_ids)
to_delete.update(prev_event_ids)
to_delete.intersection_update(original_set)
deleted = self._simple_delete_many_txn(
txn=txn,
table="event_forward_extremities",
column="event_id",
iterable=to_delete,
keyvalues={},
)
logger.info(
"Deleted %d forward extremities of %d checked, to clean up #5269",
deleted,
len(original_set),
)
if deleted:
# We now need to invalidate the caches of these rooms
rows = self._simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=to_delete,
keyvalues={},
retcols=("room_id",)
)
room_ids = set(row["room_id"] for row in rows)
for room_id in room_ids:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate,
(room_id,)
)
self._simple_delete_many_txn(
txn=txn,
table="_extremities_to_check",
column="event_id",
iterable=original_set,
keyvalues={},
)
return len(original_set)
num_handled = yield self.runInteraction(
"_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
)
if not num_handled:
yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
def _drop_table_txn(txn):
txn.execute("DROP TABLE _extremities_to_check")
yield self.runInteraction(
"_cleanup_extremities_bg_update_drop_table",
_drop_table_txn,
)
defer.returnValue(num_handled)

View file

@ -0,0 +1,23 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Start a background job to cleanup extremities that were incorrectly added
-- by bug #5269.
INSERT INTO background_updates (update_name, progress_json) VALUES
('delete_soft_failed_extremities', '{}');
DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent.
CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
CREATE INDEX _extremities_to_check_id ON _extremities_to_check(event_id);

View file

@ -0,0 +1,248 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path
from synapse.api.constants import EventTypes
from synapse.storage import prepare_database
from synapse.types import Requester, UserID
from tests.unittest import HomeserverTestCase
class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
"""Test the background update to clean forward extremities table.
"""
def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.event_creator = homeserver.get_event_creation_handler()
self.room_creator = homeserver.get_room_creation_handler()
# Create a test user and room
self.user = UserID("alice", "test")
self.requester = Requester(self.user, None, False, None, None)
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]
def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
"""Create and send an event.
Args:
soft_failed (bool): Whether to create a soft failed event or not
prev_event_ids (list[str]|None): Explicitly set the prev events,
or if None just use the default
Returns:
str: The new event's ID.
"""
prev_events_and_hashes = None
if prev_event_ids:
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
event, context = self.get_success(
self.event_creator.create_event(
self.requester,
{
"type": EventTypes.Message,
"room_id": self.room_id,
"sender": self.user.to_string(),
"content": {"body": "", "msgtype": "m.text"},
},
prev_events_and_hashes=prev_events_and_hashes,
)
)
if soft_failed:
event.internal_metadata.soft_failed = True
self.get_success(
self.event_creator.send_nonmember_event(self.requester, event, context)
)
return event.event_id
def add_extremity(self, event_id):
"""Add the given event as an extremity to the room.
"""
self.get_success(
self.store._simple_insert(
table="event_forward_extremities",
values={"room_id": self.room_id, "event_id": event_id},
desc="test_add_extremity",
)
)
self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
def run_background_update(self):
"""Re run the background update to clean up the extremities.
"""
# Make sure we don't clash with in progress updates.
self.assertTrue(self.store._all_done, "Background updates are still ongoing")
schema_path = os.path.join(
prepare_database.dir_path,
"schema",
"delta",
"54",
"delete_forward_extremities.sql",
)
def run_delta_file(txn):
prepare_database.executescript(txn, schema_path)
self.get_success(
self.store.runInteraction("test_delete_forward_extremities", run_delta_file)
)
# Ugh, have to reset this flag
self.store._all_done = False
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_soft_failed_extremities_handled_correctly(self):
"""Test that extremities are correctly calculated in the presence of
soft failed events.
Tests a graph like:
A <- SF1 <- SF2 <- B
Where SF* are soft failed.
"""
# Create the room graph
event_id_1 = self.create_and_send_event()
event_id_2 = self.create_and_send_event(True, [event_id_1])
event_id_3 = self.create_and_send_event(True, [event_id_2])
event_id_4 = self.create_and_send_event(False, [event_id_3])
# Check the latest events are as expected
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(latest_event_ids, [event_id_4])
def test_basic_cleanup(self):
"""Test that extremities are correctly calculated in the presence of
soft failed events.
Tests a graph like:
A <- SF1 <- B
Where SF* are soft failed, and with extremities of A and B
"""
# Create the room graph
event_id_a = self.create_and_send_event()
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
event_id_b = self.create_and_send_event(False, [event_id_sf1])
# Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
# Run the background update and check it did the right thing
self.run_background_update()
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(latest_event_ids, [event_id_b])
def test_chain_of_fail_cleanup(self):
"""Test that extremities are correctly calculated in the presence of
soft failed events.
Tests a graph like:
A <- SF1 <- SF2 <- B
Where SF* are soft failed, and with extremities of A and B
"""
# Create the room graph
event_id_a = self.create_and_send_event()
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
event_id_sf2 = self.create_and_send_event(True, [event_id_sf1])
event_id_b = self.create_and_send_event(False, [event_id_sf2])
# Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
# Run the background update and check it did the right thing
self.run_background_update()
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(latest_event_ids, [event_id_b])
def test_forked_graph_cleanup(self):
r"""Test that extremities are correctly calculated in the presence of
soft failed events.
Tests a graph like, where time flows down the page:
A B
/ \ /
/ \ /
SF1 SF2
| |
SF3 |
/ \ |
| \ |
C SF4
Where SF* are soft failed, and with them A, B and C marked as
extremities. This should resolve to B and C being marked as extremity.
"""
# Create the room graph
event_id_a = self.create_and_send_event()
event_id_b = self.create_and_send_event()
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b])
event_id_sf3 = self.create_and_send_event(True, [event_id_sf1])
self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4
event_id_c = self.create_and_send_event(False, [event_id_sf3])
# Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(
set(latest_event_ids), set((event_id_a, event_id_b, event_id_c))
)
# Run the background update and check it did the right thing
self.run_background_update()
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))