Merge remote-tracking branch 'origin/release-v1.20.0' into develop

This commit is contained in:
Richard van der Hoff 2020-09-08 09:58:07 +01:00
commit 8d6f97f932
11 changed files with 122 additions and 36 deletions

1
changelog.d/8264.misc Normal file
View file

@ -0,0 +1 @@
Add more logging to debug slow startup.

1
changelog.d/8266.misc Normal file
View file

@ -0,0 +1 @@
Do not attempt to upgrade upgrade database schema on worker processes.

1
changelog.d/8270.feature Normal file
View file

@ -0,0 +1 @@
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).

1
changelog.d/8271.bugfix Normal file
View file

@ -0,0 +1 @@
Fix slow start times for large servers by removing a table scan of the `users` table from startup code.

1
changelog.d/8274.feature Normal file
View file

@ -0,0 +1 @@
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).

View file

@ -219,6 +219,11 @@ class BulkPushRuleEvaluator:
if event.type == EventTypes.Member and event.state_key == uid: if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None) display_name = event.content.get("displayname", None)
if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = [] actions_by_user[uid] = []
for rule in rules: for rule in rules:

View file

@ -47,9 +47,14 @@ class Databases:
engine = create_engine(database_config.config) engine = create_engine(database_config.config)
with make_conn(database_config, engine) as db_conn: with make_conn(database_config, engine) as db_conn:
logger.info("Preparing database %r...", db_name) logger.info("[database config %r]: Checking database server", db_name)
engine.check_database(db_conn) engine.check_database(db_conn)
logger.info(
"[database config %r]: Preparing for databases %r",
db_name,
database_config.databases,
)
prepare_database( prepare_database(
db_conn, engine, hs.config, databases=database_config.databases, db_conn, engine, hs.config, databases=database_config.databases,
) )
@ -57,7 +62,9 @@ class Databases:
database = DatabasePool(hs, database_config, engine) database = DatabasePool(hs, database_config, engine)
if "main" in database_config.databases: if "main" in database_config.databases:
logger.info("Starting 'main' data store") logger.info(
"[database config %r]: Starting 'main' database", db_name
)
# Sanity check we don't try and configure the main store on # Sanity check we don't try and configure the main store on
# multiple databases. # multiple databases.
@ -72,7 +79,9 @@ class Databases:
persist_events = PersistEventsStore(hs, database, main) persist_events = PersistEventsStore(hs, database, main)
if "state" in database_config.databases: if "state" in database_config.databases:
logger.info("Starting 'state' data store") logger.info(
"[database config %r]: Starting 'state' database", db_name
)
# Sanity check we don't try and configure the state store on # Sanity check we don't try and configure the state store on
# multiple databases. # multiple databases.
@ -85,7 +94,7 @@ class Databases:
self.databases.append(database) self.databases.append(database)
logger.info("Database %r prepared", db_name) logger.info("[database config %r]: prepared", db_name)
# Closing the context manager doesn't close the connection. # Closing the context manager doesn't close the connection.
# psycopg will close the connection when the object gets GCed, but *only* # psycopg will close the connection when the object gets GCed, but *only*
@ -98,10 +107,10 @@ class Databases:
# Sanity check that we have actually configured all the required stores. # Sanity check that we have actually configured all the required stores.
if not main: if not main:
raise Exception("No 'main' data store configured") raise Exception("No 'main' database configured")
if not state: if not state:
raise Exception("No 'state' data store configured") raise Exception("No 'state' database configured")
# We use local variables here to ensure that the databases do not have # We use local variables here to ensure that the databases do not have
# optional types. # optional types.

View file

@ -29,6 +29,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator, MultiWriterIdGenerator,
StreamIdGenerator, StreamIdGenerator,
) )
from synapse.types import get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore from .account_data import AccountDataStore
@ -591,21 +592,24 @@ def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig
"""Called before upgrading an existing database to check that it is broadly sane """Called before upgrading an existing database to check that it is broadly sane
compared with the configuration. compared with the configuration.
""" """
domain = config.server_name logger.info("Checking database for consistency with configuration...")
sql = database_engine.convert_param_style( # if there are any users in the database, check that the username matches our
"SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" # configured server name.
)
pat = "%:" + domain cur.execute("SELECT name FROM users LIMIT 1")
cur.execute(sql, (pat,)) rows = cur.fetchall()
num_not_matching = cur.fetchall()[0][0] if not rows:
if num_not_matching == 0: return
user_domain = get_domain_from_id(rows[0][0])
if user_domain == config.server_name:
return return
raise Exception( raise Exception(
"Found users in database not native to %s!\n" "Found users in database not native to %s!\n"
"You cannot changed a synapse server_name after it's been configured" "You cannot change a synapse server_name after it's been configured"
% (domain,) % (config.server_name,)
) )

View file

@ -177,6 +177,11 @@ class EventPushActionsWorkerStore(SQLBaseStore):
if row: if row:
notif_count += row[0] notif_count += row[0]
if row[1] is not None:
# The unread_count column of event_push_summary is NULLable, so we need
# to make sure we don't try increasing the unread counts if it's NULL
# for this row.
unread_count += row[1] unread_count += row[1]
return { return {

View file

@ -50,6 +50,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass pass
OUTDATED_SCHEMA_ON_WORKER_ERROR = (
"Expected database schema version %i but got %i: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)
EMPTY_DATABASE_ON_WORKER_ERROR = (
"Uninitialised database: run the main synapse process to prepare the database "
"schema before starting worker processes."
)
UNAPPLIED_DELTA_ON_WORKER_ERROR = (
"Database schema delta %s has not been applied: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)
def prepare_database( def prepare_database(
db_conn: Connection, db_conn: Connection,
database_engine: BaseDatabaseEngine, database_engine: BaseDatabaseEngine,
@ -83,20 +99,32 @@ def prepare_database(
# at all, so this is redundant but harmless there.) # at all, so this is redundant but harmless there.)
cur.execute("BEGIN TRANSACTION") cur.execute("BEGIN TRANSACTION")
logger.info("%r: Checking existing schema version", databases)
version_info = _get_or_create_schema_state(cur, database_engine) version_info = _get_or_create_schema_state(cur, database_engine)
if version_info: if version_info:
user_version, delta_files, upgraded = version_info user_version, delta_files, upgraded = version_info
logger.info(
if config is None: "%r: Existing schema is %i (+%i deltas)",
if user_version != SCHEMA_VERSION: databases,
# If we don't pass in a config file then we are expecting to user_version,
# have already upgraded the DB. len(delta_files),
raise UpgradeDatabaseException(
"Expected database schema version %i but got %i"
% (SCHEMA_VERSION, user_version)
) )
else:
# config should only be None when we are preparing an in-memory SQLite db,
# which should be empty.
if config is None:
raise ValueError(
"config==None in prepare_database, but databse is not empty"
)
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker_app is not None and user_version != SCHEMA_VERSION:
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
)
_upgrade_existing_database( _upgrade_existing_database(
cur, cur,
user_version, user_version,
@ -107,6 +135,13 @@ def prepare_database(
databases=databases, databases=databases,
) )
else: else:
logger.info("%r: Initialising new database", databases)
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config and config.worker_app is not None:
raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
_setup_new_database(cur, database_engine, databases=databases) _setup_new_database(cur, database_engine, databases=databases)
# check if any of our configured dynamic modules want a database # check if any of our configured dynamic modules want a database
@ -312,6 +347,8 @@ def _upgrade_existing_database(
else: else:
assert config assert config
is_worker = config and config.worker_app is not None
if current_version > SCHEMA_VERSION: if current_version > SCHEMA_VERSION:
raise ValueError( raise ValueError(
"Cannot use this database as it is too " "Cannot use this database as it is too "
@ -339,7 +376,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres") specific_engine_extensions = (".sqlite", ".postgres")
for v in range(start_ver, SCHEMA_VERSION + 1): for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v) logger.info("Applying schema deltas for v%d", v)
# We need to search both the global and per data store schema # We need to search both the global and per data store schema
# directories for schema updates. # directories for schema updates.
@ -399,9 +436,15 @@ def _upgrade_existing_database(
continue continue
root_name, ext = os.path.splitext(file_name) root_name, ext = os.path.splitext(file_name)
if ext == ".py": if ext == ".py":
# This is a python upgrade module. We need to import into some # This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function. # package and then execute its `run_upgrade` function.
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
module_name = "synapse.storage.v%d_%s" % (v, root_name) module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file: with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file) module = imp.load_source(module_name, absolute_path, python_file)
@ -416,10 +459,18 @@ def _upgrade_existing_database(
continue continue
elif ext == ".sql": elif ext == ".sql":
# A plain old .sql file, just read and execute it # A plain old .sql file, just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying schema %s", relative_path) logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path) executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"): elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it # A .sql file specific to our engine; just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying engine-specific schema %s", relative_path) logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path) executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"): elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@ -449,6 +500,8 @@ def _upgrade_existing_database(
(v, True), (v, True),
) )
logger.info("Schema now up to date")
def _apply_module_schemas(txn, database_engine, config): def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any """Apply the module schemas for the dynamic modules, if any

View file

@ -15,6 +15,7 @@
import contextlib import contextlib
import heapq import heapq
import logging
import threading import threading
from collections import deque from collections import deque
from typing import Dict, List, Set from typing import Dict, List, Set
@ -24,6 +25,8 @@ from typing_extensions import Deque
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.util.sequence import PostgresSequenceGenerator from synapse.storage.util.sequence import PostgresSequenceGenerator
logger = logging.getLogger(__name__)
class IdGenerator: class IdGenerator:
def __init__(self, db_conn, table, column): def __init__(self, db_conn, table, column):
@ -48,6 +51,8 @@ def _load_current_id(db_conn, table, column, step=1):
Returns: Returns:
int int
""" """
# debug logging for https://github.com/matrix-org/synapse/issues/7968
logger.info("initialising stream generator for %s(%s)", table, column)
cur = db_conn.cursor() cur = db_conn.cursor()
if step == 1: if step == 1:
cur.execute("SELECT MAX(%s) FROM %s" % (column, table)) cur.execute("SELECT MAX(%s) FROM %s" % (column, table))