Fix non-strings in the event_search table (#12037)

Don't attempt to add non-string `value`s to `event_search` and add a
background update to clear out bad rows from `event_search` when
using sqlite.

Signed-off-by: Sean Quah <seanq@element.io>
This commit is contained in:
Sean Quah 2022-02-24 11:52:28 +00:00 committed by GitHub
parent c56bfb08bc
commit 41cf4c2cf6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 11 deletions

1
changelog.d/12037.bugfix Normal file
View file

@ -0,0 +1 @@
Properly fix a long-standing bug where wrong data could be inserted in the `event_search` table when using sqlite. This could block running `synapse_port_db` with an "argument of type 'int' is not iterable" error. This bug was partially fixed by a change in Synapse 1.44.0.

View file

@ -1473,10 +1473,10 @@ class PersistEventsStore:
def _update_metadata_tables_txn( def _update_metadata_tables_txn(
self, self,
txn, txn: LoggingTransaction,
*, *,
events_and_contexts, events_and_contexts: List[Tuple[EventBase, EventContext]],
all_events_and_contexts, all_events_and_contexts: List[Tuple[EventBase, EventContext]],
inhibit_local_membership_updates: bool = False, inhibit_local_membership_updates: bool = False,
): ):
"""Update all the miscellaneous tables for new events """Update all the miscellaneous tables for new events
@ -1953,20 +1953,20 @@ class PersistEventsStore:
txn, table="event_relations", keyvalues={"event_id": redacted_event_id} txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
) )
def _store_room_topic_txn(self, txn, event): def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase):
if hasattr(event, "content") and "topic" in event.content: if isinstance(event.content.get("topic"), str):
self.store_event_search_txn( self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"] txn, event, "content.topic", event.content["topic"]
) )
def _store_room_name_txn(self, txn, event): def _store_room_name_txn(self, txn: LoggingTransaction, event: EventBase):
if hasattr(event, "content") and "name" in event.content: if isinstance(event.content.get("name"), str):
self.store_event_search_txn( self.store_event_search_txn(
txn, event, "content.name", event.content["name"] txn, event, "content.name", event.content["name"]
) )
def _store_room_message_txn(self, txn, event): def _store_room_message_txn(self, txn: LoggingTransaction, event: EventBase):
if hasattr(event, "content") and "body" in event.content: if isinstance(event.content.get("body"), str):
self.store_event_search_txn( self.store_event_search_txn(
txn, event, "content.body", event.content["body"] txn, event, "content.body", event.content["body"]
) )

View file

@ -115,6 +115,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
EVENT_SEARCH_DELETE_NON_STRINGS = "event_search_sqlite_delete_non_strings"
def __init__( def __init__(
self, self,
@ -147,6 +148,10 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
) )
self.db_pool.updates.register_background_update_handler(
self.EVENT_SEARCH_DELETE_NON_STRINGS, self._background_delete_non_strings
)
async def _background_reindex_search(self, progress, batch_size): async def _background_reindex_search(self, progress, batch_size):
# we work through the events table from highest stream id to lowest # we work through the events table from highest stream id to lowest
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]
@ -372,6 +377,27 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return num_rows return num_rows
async def _background_delete_non_strings(
self, progress: JsonDict, batch_size: int
) -> int:
"""Deletes rows with non-string `value`s from `event_search` if using sqlite.
Prior to Synapse 1.44.0, malformed events received over federation could cause integers
to be inserted into the `event_search` table when using sqlite.
"""
def delete_non_strings_txn(txn: LoggingTransaction) -> None:
txn.execute("DELETE FROM event_search WHERE typeof(value) != 'text'")
await self.db_pool.runInteraction(
self.EVENT_SEARCH_DELETE_NON_STRINGS, delete_non_strings_txn
)
await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_DELETE_NON_STRINGS
)
return 1
class SearchStore(SearchBackgroundUpdateStore): class SearchStore(SearchBackgroundUpdateStore):
def __init__( def __init__(

View file

@ -0,0 +1,22 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Delete rows with non-string `value`s from `event_search` if using sqlite.
--
-- Prior to Synapse 1.44.0, malformed events received over federation could
-- cause integers to be inserted into the `event_search` table.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6805, 'event_search_sqlite_delete_non_strings', '{}');

View file

@ -13,13 +13,16 @@
# limitations under the License. # limitations under the License.
import synapse.rest.admin import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
from synapse.rest.client import login, room from synapse.rest.client import login, room
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase, skip_unless
from tests.utils import USE_POSTGRES_FOR_TESTS
class NullByteInsertionTest(HomeserverTestCase): class EventSearchInsertionTest(HomeserverTestCase):
servlets = [ servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource, synapse.rest.admin.register_servlets_for_client_rest_resource,
login.register_servlets, login.register_servlets,
@ -72,3 +75,113 @@ class NullByteInsertionTest(HomeserverTestCase):
) )
if isinstance(store.database_engine, PostgresEngine): if isinstance(store.database_engine, PostgresEngine):
self.assertIn("alice", result.get("highlights")) self.assertIn("alice", result.get("highlights"))
def test_non_string(self):
"""Test that non-string `value`s are not inserted into `event_search`.
This is particularly important when using sqlite, since a sqlite column can hold
both strings and integers. When using Postgres, integers are automatically
converted to strings.
Regression test for #11918.
"""
store = self.hs.get_datastores().main
# Register a user and create a room
user_id = self.register_user("alice", "password")
access_token = self.login("alice", "password")
room_id = self.helper.create_room_as("alice", tok=access_token)
room_version = self.get_success(store.get_room_version(room_id))
# Construct a message with a numeric body to be received over federation
# The message can't be sent using the client API, since Synapse's event
# validation will reject it.
prev_event_ids = self.get_success(store.get_prev_events_for_room(room_id))
prev_event = self.get_success(store.get_event(prev_event_ids[0]))
prev_state_map = self.get_success(
self.hs.get_storage().state.get_state_ids_for_event(prev_event_ids[0])
)
event_dict = {
"type": EventTypes.Message,
"content": {"msgtype": "m.text", "body": 2},
"room_id": room_id,
"sender": user_id,
"depth": prev_event.depth + 1,
"prev_events": prev_event_ids,
"origin_server_ts": self.clock.time_msec(),
}
builder = self.hs.get_event_builder_factory().for_room_version(
room_version, event_dict
)
event = self.get_success(
builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=self.hs.get_event_auth_handler().compute_auth_events(
builder,
prev_state_map,
for_verification=False,
),
depth=event_dict["depth"],
)
)
# Receive the event
self.get_success(
self.hs.get_federation_event_handler().on_receive_pdu(
self.hs.hostname, event
)
)
# The event should not have an entry in the `event_search` table
f = self.get_failure(
store.db_pool.simple_select_one_onecol(
"event_search",
{"room_id": room_id, "event_id": event.event_id},
"event_id",
),
StoreError,
)
self.assertEqual(f.value.code, 404)
@skip_unless(not USE_POSTGRES_FOR_TESTS, "requires sqlite")
def test_sqlite_non_string_deletion_background_update(self):
"""Test the background update to delete bad rows from `event_search`."""
store = self.hs.get_datastores().main
# Populate `event_search` with dummy data
self.get_success(
store.db_pool.simple_insert_many(
"event_search",
keys=["event_id", "room_id", "key", "value"],
values=[
("event1", "room_id", "content.body", "hi"),
("event2", "room_id", "content.body", "2"),
("event3", "room_id", "content.body", 3),
],
desc="populate_event_search",
)
)
# Run the background update
store.db_pool.updates._all_done = False
self.get_success(
store.db_pool.simple_insert(
"background_updates",
{
"update_name": "event_search_sqlite_delete_non_strings",
"progress_json": "{}",
},
)
)
self.wait_for_background_updates()
# The non-string `value`s ought to be gone now.
values = self.get_success(
store.db_pool.simple_select_onecol(
"event_search",
{"room_id": "room_id"},
"value",
),
)
self.assertCountEqual(values, ["hi", "2"])