From 3223f415e2e088ea1f8eff3af4d3415dc9e5531a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Nov 2018 16:00:32 +0000 Subject: [PATCH] Add server health apis and server presence --- synapse/event_auth.py | 3 ++ synapse/handlers/sync.py | 25 +++++++++- synapse/rest/client/v1/admin.py | 87 +++++++++++++++++++++++++++++++++ synapse/storage/registration.py | 9 ---- synapse/storage/roommember.py | 17 +++++++ 5 files changed, 131 insertions(+), 10 deletions(-) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index c81d8e6729..2f5f8819c1 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -103,6 +103,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): "No create event in auth events", ) + if event.type == "org.matrix.server_presence": + return + creating_domain = get_domain_from_id(event.room_id) originating_domain = get_domain_from_id(event.sender) if creating_domain != originating_domain: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 09739f2862..675e6c8e02 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -35,6 +35,7 @@ from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) @@ -211,6 +212,7 @@ class SyncHandler(object): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() self.auth = hs.get_auth() + self.builder_factory = hs.get_event_builder_factory() # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) self.lazy_loaded_members_cache = ExpiringCache( @@ -709,7 +711,6 @@ class SyncHandler(object): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): - members_to_fetch = None lazy_load_members = sync_config.filter_collection.lazy_load_members() @@ -858,6 +859,28 @@ class SyncHandler(object): if state_ids: state = yield self.store.get_events(list(state_ids.values())) + hosts_in_room = yield self.store.get_hosts_in_room(room_id) + destination_states = yield self.store.get_destination_states() + + for host in hosts_in_room: + if host not in destination_states: + continue + + if ("org.matrix.server_presence", host) in timeline_state: + continue + + state[("org.matrix.server_presence", host)] = self.builder_factory.new({ + "type": "org.matrix.server_presence", + "content": { + "state": "connected" if destination_states[host] else "disconnected", + }, + "state_key": host, + "event_id": random_string(24), + "origin_server_ts": self.clock.time_msec(), + "sender": "@server:server", + "room_id": room_id, + }) + defer.returnValue({ (e.type, e.state_key): e for e in sync_config.filter_collection.filter_room_state(list(state.values())) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 41534b8c2a..c729cdbd30 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -32,6 +32,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.types import UserID, create_requester +from synapse.util.stringutils import random_string from .base import ClientV1RestServlet, client_path_patterns @@ -740,6 +741,91 @@ class SearchUsersRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class ServerHealth(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/server_health") + + def __init__(self, hs): + super(ServerHealth, self).__init__(hs) + self.event_creation_handler = hs.get_event_creation_handler() + self.store = hs.get_datastore() + self.builder_factory = hs.get_event_builder_factory() + self.clock = hs.get_clock() + + def on_GET(self, request): + return self.do_update() + + def on_POST(self, request): + return self.do_update() + + @defer.inlineCallbacks + def do_update(self): + hosts = yield self.store.get_all_destination_healths() + + up_servers = set(h for h, c in hosts.items() if c is not None) + down_servers = set(h for h, c in hosts.items() if c is None) + + rooms_to_hosts = yield self.store.get_all_hosts_and_room() + + requester = create_requester(UserID("server", "server")), + + state = yield self.store.get_destination_states() + + new_up = set() + new_down = set() + + for host in up_servers: + if state.get(host, True): + continue + new_up.add(host) + + yield self.store.store_destination_state(host, True) + + for host in down_servers: + if not state.get(host, True): + continue + new_down.add(host) + + yield self.store.store_destination_state(host, False) + + for room_id, hosts in rooms_to_hosts.items(): + for host in hosts: + if host in new_up: + new_state = "connected" + elif host in new_down: + new_state = "disconnected" + else: + continue + + logger.info("Marking %s as %r", host, new_state) + + builder = self.builder_factory.new({ + "type": "org.matrix.server_presence", + "content": { + "state": new_state, + }, + "state_key": host, + "event_id": random_string(24), + "origin_server_ts": self.clock.time_msec(), + "sender": "@server:server", + "room_id": room_id, + }) + + event, context = yield self.event_creation_handler.create_new_client_event( + builder=builder, + ) + event.internal_metadata.internal_event = True + yield self.event_creation_handler.handle_new_client_event( + requester, + event, + context, + ratelimit=False, + extra_users=[], + do_auth=False, + ) + + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) @@ -754,3 +840,4 @@ def register_servlets(hs, http_server): QuarantineMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server) UserRegisterServlet(hs).register(http_server) + ServerHealth(hs).register(http_server) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 80d76bf9d7..ac5e1656fe 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -406,11 +406,6 @@ class RegistrationStore(RegistrationWorkerStore, ) tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _, _ in tokens_and_devices: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) - txn.execute( "DELETE FROM access_tokens WHERE %s" % where_clause, values @@ -432,10 +427,6 @@ class RegistrationStore(RegistrationWorkerStore, }, ) - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (access_token,) - ) - return self.runInteraction("delete_access_token", f) @cachedInlineCallbacks() diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0707f9a86a..d0db8528ee 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -88,6 +88,23 @@ class RoomMemberWorkerStore(EventsWorkerStore): return [to_ascii(r[0]) for r in txn] return self.runInteraction("get_users_in_room", f) + def get_all_hosts_and_room(self): + def f(txn): + sql = """ + SELECT DISTINCT room_id, regexp_replace(state_key, '^[^:]*:', '') AS host + FROM current_state_events + INNER JOIN room_memberships USING (event_id, room_id) + WHERE + type = 'm.room.member' AND membership = 'join' + """ + + txn.execute(sql) + results = {} + for r in txn: + results.setdefault(to_ascii(r[0]), set()).add(to_ascii(r[1])) + return results + return self.runInteraction("get_users_in_room", f) + @cached(max_entries=100000) def get_room_summary(self, room_id): """ Get the details of a room roughly suitable for use by the room