mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-25 02:55:46 +03:00
23740eaa3d
During the migration the automated script to update the copyright headers accidentally got rid of some of the existing copyright lines. Reinstate them.
374 lines
12 KiB
Python
374 lines
12 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
from typing import Collection
|
|
|
|
from parameterized import parameterized
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
import synapse.rest.admin
|
|
from synapse.api.errors import Codes
|
|
from synapse.rest.client import login
|
|
from synapse.server import HomeServer
|
|
from synapse.storage.background_updates import BackgroundUpdater
|
|
from synapse.types import JsonDict
|
|
from synapse.util import Clock
|
|
|
|
from tests import unittest
|
|
|
|
|
|
class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
|
|
servlets = [
|
|
synapse.rest.admin.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
self.updater = BackgroundUpdater(hs, self.store.db_pool)
|
|
|
|
@parameterized.expand(
|
|
[
|
|
("GET", "/_synapse/admin/v1/background_updates/enabled"),
|
|
("POST", "/_synapse/admin/v1/background_updates/enabled"),
|
|
("GET", "/_synapse/admin/v1/background_updates/status"),
|
|
("POST", "/_synapse/admin/v1/background_updates/start_job"),
|
|
]
|
|
)
|
|
def test_requester_is_no_admin(self, method: str, url: str) -> None:
|
|
"""
|
|
If the user is not a server admin, an error 403 is returned.
|
|
"""
|
|
|
|
self.register_user("user", "pass", admin=False)
|
|
other_user_tok = self.login("user", "pass")
|
|
|
|
channel = self.make_request(
|
|
method,
|
|
url,
|
|
content={},
|
|
access_token=other_user_tok,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_invalid_parameter(self) -> None:
|
|
"""
|
|
If parameters are invalid, an error is returned.
|
|
"""
|
|
url = "/_synapse/admin/v1/background_updates/start_job"
|
|
|
|
# empty content
|
|
channel = self.make_request(
|
|
"POST",
|
|
url,
|
|
content={},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
|
|
|
|
# job_name invalid
|
|
channel = self.make_request(
|
|
"POST",
|
|
url,
|
|
content={"job_name": "unknown"},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
|
|
|
|
def _register_bg_update(self) -> None:
|
|
"Adds a bg update but doesn't start it"
|
|
|
|
async def _fake_update(progress: JsonDict, batch_size: int) -> int:
|
|
await self.clock.sleep(0.2)
|
|
return batch_size
|
|
|
|
self.store.db_pool.updates.register_background_update_handler(
|
|
"test_update",
|
|
_fake_update,
|
|
)
|
|
|
|
self.get_success(
|
|
self.store.db_pool.simple_insert(
|
|
table="background_updates",
|
|
values={
|
|
"update_name": "test_update",
|
|
"progress_json": "{}",
|
|
},
|
|
)
|
|
)
|
|
|
|
def test_status_empty(self) -> None:
|
|
"""Test the status API works."""
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/status",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
# Background updates should be enabled, but none should be running.
|
|
self.assertDictEqual(
|
|
channel.json_body, {"current_updates": {}, "enabled": True}
|
|
)
|
|
|
|
def test_status_bg_update(self) -> None:
|
|
"""Test the status API works with a background update."""
|
|
|
|
# Create a new background update
|
|
self._register_bg_update()
|
|
|
|
self.store.db_pool.updates.start_doing_background_updates()
|
|
|
|
self.reactor.pump([1.0, 1.0, 1.0])
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/status",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
# Background updates should be enabled, and one should be running.
|
|
self.assertDictEqual(
|
|
channel.json_body,
|
|
{
|
|
"current_updates": {
|
|
"master": {
|
|
"name": "test_update",
|
|
"average_items_per_ms": 0.1,
|
|
"total_duration_ms": 1000.0,
|
|
"total_item_count": (
|
|
self.updater.default_background_batch_size
|
|
),
|
|
}
|
|
},
|
|
"enabled": True,
|
|
},
|
|
)
|
|
|
|
def test_enabled(self) -> None:
|
|
"""Test the enabled API works."""
|
|
|
|
# Create a new background update
|
|
|
|
self._register_bg_update()
|
|
self.store.db_pool.updates.start_doing_background_updates()
|
|
|
|
# Test that GET works and returns enabled is True.
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/enabled",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertDictEqual(channel.json_body, {"enabled": True})
|
|
|
|
# Disable the BG updates
|
|
channel = self.make_request(
|
|
"POST",
|
|
"/_synapse/admin/v1/background_updates/enabled",
|
|
content={"enabled": False},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertDictEqual(channel.json_body, {"enabled": False})
|
|
|
|
# Advance a bit and get the current status, note this will finish the in
|
|
# flight background update so we call it the status API twice and check
|
|
# there was no change.
|
|
self.reactor.pump([1.0, 1.0])
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/status",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertDictEqual(
|
|
channel.json_body,
|
|
{
|
|
"current_updates": {
|
|
"master": {
|
|
"name": "test_update",
|
|
"average_items_per_ms": 0.1,
|
|
"total_duration_ms": 1000.0,
|
|
"total_item_count": (
|
|
self.updater.default_background_batch_size
|
|
),
|
|
}
|
|
},
|
|
"enabled": False,
|
|
},
|
|
)
|
|
|
|
# Run the reactor for a bit so the BG updates would have a chance to run
|
|
# if they were to.
|
|
self.reactor.pump([1.0, 1.0])
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/status",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
# There should be no change from the previous /status response.
|
|
self.assertDictEqual(
|
|
channel.json_body,
|
|
{
|
|
"current_updates": {
|
|
"master": {
|
|
"name": "test_update",
|
|
"average_items_per_ms": 0.1,
|
|
"total_duration_ms": 1000.0,
|
|
"total_item_count": (
|
|
self.updater.default_background_batch_size
|
|
),
|
|
}
|
|
},
|
|
"enabled": False,
|
|
},
|
|
)
|
|
|
|
# Re-enable the background updates.
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
"/_synapse/admin/v1/background_updates/enabled",
|
|
content={"enabled": True},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
self.assertDictEqual(channel.json_body, {"enabled": True})
|
|
|
|
self.reactor.pump([1.0, 1.0])
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
"/_synapse/admin/v1/background_updates/status",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
# Background updates should be enabled and making progress.
|
|
self.assertDictEqual(
|
|
channel.json_body,
|
|
{
|
|
"current_updates": {
|
|
"master": {
|
|
"name": "test_update",
|
|
"average_items_per_ms": 0.05263157894736842,
|
|
"total_duration_ms": 2000.0,
|
|
"total_item_count": (110),
|
|
}
|
|
},
|
|
"enabled": True,
|
|
},
|
|
)
|
|
|
|
@parameterized.expand(
|
|
[
|
|
("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
|
|
(
|
|
"regenerate_directory",
|
|
[
|
|
"populate_user_directory_createtables",
|
|
"populate_user_directory_process_rooms",
|
|
"populate_user_directory_process_users",
|
|
"populate_user_directory_cleanup",
|
|
],
|
|
),
|
|
]
|
|
)
|
|
def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None:
|
|
"""
|
|
Test that background updates add to database and be processed.
|
|
|
|
Args:
|
|
job_name: name of the job to call with API
|
|
updates: collection of background updates to be started
|
|
"""
|
|
|
|
# no background update is waiting
|
|
self.assertTrue(
|
|
self.get_success(
|
|
self.store.db_pool.updates.has_completed_background_updates()
|
|
)
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
"/_synapse/admin/v1/background_updates/start_job",
|
|
content={"job_name": job_name},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
# test that each background update is waiting now
|
|
for update in updates:
|
|
self.assertFalse(
|
|
self.get_success(
|
|
self.store.db_pool.updates.has_completed_background_update(update)
|
|
)
|
|
)
|
|
|
|
self.wait_for_background_updates()
|
|
|
|
# background updates are done
|
|
self.assertTrue(
|
|
self.get_success(
|
|
self.store.db_pool.updates.has_completed_background_updates()
|
|
)
|
|
)
|
|
|
|
def test_start_backround_job_twice(self) -> None:
|
|
"""Test that add a background update twice return an error."""
|
|
|
|
# add job to database
|
|
self.get_success(
|
|
self.store.db_pool.simple_insert(
|
|
table="background_updates",
|
|
values={
|
|
"update_name": "populate_stats_process_rooms",
|
|
"progress_json": "{}",
|
|
},
|
|
)
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
"/_synapse/admin/v1/background_updates/start_job",
|
|
content={"job_name": "populate_stats_process_rooms"},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|