Add server health apis and server presence

This commit is contained in:
Erik Johnston 2018-11-23 16:00:32 +00:00 committed by Andrew Morgan
parent f57e71645a
commit 3223f415e2
5 changed files with 131 additions and 10 deletions

View file

@ -103,6 +103,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"No create event in auth events",
)
if event.type == "org.matrix.server_presence":
return
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
if creating_domain != originating_domain:

View file

@ -35,6 +35,7 @@ from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
@ -211,6 +212,7 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.builder_factory = hs.get_event_builder_factory()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@ -709,7 +711,6 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
@ -858,6 +859,28 @@ class SyncHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
hosts_in_room = yield self.store.get_hosts_in_room(room_id)
destination_states = yield self.store.get_destination_states()
for host in hosts_in_room:
if host not in destination_states:
continue
if ("org.matrix.server_presence", host) in timeline_state:
continue
state[("org.matrix.server_presence", host)] = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": "connected" if destination_states[host] else "disconnected",
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
defer.returnValue({
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(list(state.values()))

View file

@ -32,6 +32,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.types import UserID, create_requester
from synapse.util.stringutils import random_string
from .base import ClientV1RestServlet, client_path_patterns
@ -740,6 +741,91 @@ class SearchUsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
class ServerHealth(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/server_health")
def __init__(self, hs):
super(ServerHealth, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock()
def on_GET(self, request):
return self.do_update()
def on_POST(self, request):
return self.do_update()
@defer.inlineCallbacks
def do_update(self):
hosts = yield self.store.get_all_destination_healths()
up_servers = set(h for h, c in hosts.items() if c is not None)
down_servers = set(h for h, c in hosts.items() if c is None)
rooms_to_hosts = yield self.store.get_all_hosts_and_room()
requester = create_requester(UserID("server", "server")),
state = yield self.store.get_destination_states()
new_up = set()
new_down = set()
for host in up_servers:
if state.get(host, True):
continue
new_up.add(host)
yield self.store.store_destination_state(host, True)
for host in down_servers:
if not state.get(host, True):
continue
new_down.add(host)
yield self.store.store_destination_state(host, False)
for room_id, hosts in rooms_to_hosts.items():
for host in hosts:
if host in new_up:
new_state = "connected"
elif host in new_down:
new_state = "disconnected"
else:
continue
logger.info("Marking %s as %r", host, new_state)
builder = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": new_state,
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
event.internal_metadata.internal_event = True
yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
ratelimit=False,
extra_users=[],
do_auth=False,
)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
@ -754,3 +840,4 @@ def register_servlets(hs, http_server):
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
ServerHealth(hs).register(http_server)

View file

@ -406,11 +406,6 @@ class RegistrationStore(RegistrationWorkerStore,
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
txn.execute(
"DELETE FROM access_tokens WHERE %s" % where_clause,
values
@ -432,10 +427,6 @@ class RegistrationStore(RegistrationWorkerStore,
},
)
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (access_token,)
)
return self.runInteraction("delete_access_token", f)
@cachedInlineCallbacks()

View file

@ -88,6 +88,23 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f)
def get_all_hosts_and_room(self):
def f(txn):
sql = """
SELECT DISTINCT room_id, regexp_replace(state_key, '^[^:]*:', '') AS host
FROM current_state_events
INNER JOIN room_memberships USING (event_id, room_id)
WHERE
type = 'm.room.member' AND membership = 'join'
"""
txn.execute(sql)
results = {}
for r in txn:
results.setdefault(to_ascii(r[0]), set()).add(to_ascii(r[1]))
return results
return self.runInteraction("get_users_in_room", f)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
""" Get the details of a room roughly suitable for use by the room