Merge pull request #3709 from matrix-org/rav/logcontext_for_replication_commands

Logcontexts for replication command handlers
This commit is contained in:
Richard van der Hoff 2018-08-17 16:22:07 +01:00 committed by GitHub
commit 3cef867cc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 54 additions and 20 deletions

1
changelog.d/3709.misc Normal file
View file

@ -0,0 +1 @@
Logcontexts for replication command handlers

View file

@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View file

@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)

View file

@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks

View file

@ -338,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View file

@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(UserDirectoryReplicationHandler, self).on_rdata(
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":

View file

@ -107,7 +107,7 @@ class ReplicationClientHandler(object):
Can be overriden in subclasses to handle more.
"""
logger.info("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
return self.store.process_replication_rows(stream_name, token, rows)
def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
@ -115,7 +115,7 @@ class ReplicationClientHandler(object):
Can be overriden in subclasses to handle more.
"""
self.store.process_replication_rows(stream_name, token, [])
return self.store.process_replication_rows(stream_name, token, [])
def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting

View file

@ -59,6 +59,12 @@ class Command(object):
"""
return self.data
def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""
# by default, we just use the command name.
return self.NAME
class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
@ -116,6 +122,9 @@ class RdataCommand(Command):
_json_encoder.encode(self.row),
))
def get_logcontext_id(self):
return "RDATA-" + self.stream_name
class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
@ -190,6 +199,9 @@ class ReplicateCommand(Command):
def to_line(self):
return " ".join((self.stream_name, str(self.token),))
def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or

View file

@ -63,6 +63,8 @@ from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string
from .commands import (
@ -222,7 +224,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# Now lets try and call on_<CMD_NAME> function
try:
getattr(self, "on_%s" % (cmd_name,))(cmd)
run_as_background_process(
"replication-" + cmd.get_logcontext_id(),
getattr(self, "on_%s" % (cmd_name,)),
cmd,
)
except Exception:
logger.exception("[%s] Failed to handle line: %r", self.id(), line)
@ -387,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.name = cmd.data
def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(
return self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
)
@ -397,22 +403,33 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
for stream in iterkeys(self.streamer.streams_by_name):
self.subscribe_to_stream(stream, token)
deferreds = [
run_in_background(
self.subscribe_to_stream,
stream, token,
)
for stream in iterkeys(self.streamer.streams_by_name)
]
return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
self.subscribe_to_stream(stream_name, token)
return self.subscribe_to_stream(stream_name, token)
def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
return self.streamer.federation_ack(cmd.token)
def on_REMOVE_PUSHER(self, cmd):
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
return self.streamer.on_remove_pusher(
cmd.app_id, cmd.push_key, cmd.user_id,
)
def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
return self.streamer.on_user_ip(
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
cmd.last_seen,
)
@ -542,14 +559,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
self.handler.on_rdata(stream_name, cmd.token, rows)
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
return self.handler.on_sync(cmd.data)
def replicate(self, stream_name, token):
"""Send the subscription request to the server