dkjfhsdklfhsdlkjf

This commit is contained in:
Erik Johnston 2020-03-25 14:55:02 +00:00
parent 0473f87a17
commit 83ecaeecbf
13 changed files with 90 additions and 60 deletions

View file

@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
self.pusher_pool = hs.get_pusherpool()
if hs.config.send_federation:
self.send_handler = FederationSenderHandler(hs, self)
self.send_handler = FederationSenderHandler(hs)
else:
self.send_handler = None
@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
async def process_and_notify(self, stream_name, instance_name, token, rows):
try:
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)
self.send_handler.process_replication_rows(
stream_name, instance_name, token, rows
)
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
@ -724,13 +726,14 @@ class FederationSenderHandler(object):
to the federation sender.
"""
def __init__(self, hs: GenericWorkerServer, replication_client):
def __init__(self, hs: GenericWorkerServer):
self.hs = hs
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
# self.replication_client = hs.get_tcp_replication()
self.federation_position = self.store.federation_out_pos_startup
self.federation_position = {"master": self.store.federation_out_pos_startup}
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
@ -749,14 +752,14 @@ class FederationSenderHandler(object):
self.federation_sender.wake_destination(server)
def stream_positions(self):
return {"federation": {"master": self.federation_position}}
return {"federation": self.federation_position}
def process_replication_rows(self, stream_name, token, rows):
def process_replication_rows(self, stream_name, instance_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
run_in_background(self.update_token, token)
run_in_background(self.update_token, instance_name, token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
@ -804,9 +807,12 @@ class FederationSenderHandler(object):
)
await self.federation_sender.send_read_receipt(receipt_info)
async def update_token(self, token):
async def update_token(self, instance_name, token):
try:
self.federation_position = token
self.federation_position[instance_name] = token
return
# FIXME
# We linearize here to ensure we don't have races updating the token
with (await self._fed_position_linearizer.queue(None)):
@ -817,7 +823,7 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(
self.hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position

View file

@ -48,6 +48,8 @@ class WorkerConfig(Config):
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
self.instance_http_map = config.get("instance_http_map", {})
# This option is really only here to support `--manhole` command line
# argument.
manhole = config.get("worker_manhole")

View file

@ -819,7 +819,16 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
edu_type, origin, content
)
return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
if edu_type == "m.typing":
instance_name = "synapse.app.client_reader"
else:
instance_name = "master"
return await self._send_edu(
instance_name=instance_name,
edu_type=edu_type,
origin=origin,
content=content,
)
async def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry

View file

@ -56,7 +56,7 @@ class TypingHandler(object):
self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000)
# self.federation = hs.get_federation_sender()
self.federation = hs.get_federation_sender()
hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
@ -203,16 +203,16 @@ class TypingHandler(object):
for domain in {get_domain_from_id(u) for u in users}:
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
# self.federation.build_and_send_edu(
# destination=domain,
# edu_type="m.typing",
# content={
# "room_id": member.room_id,
# "user_id": member.user_id,
# "typing": typing,
# },
# key=member,
# )
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": member.room_id,
"user_id": member.user_id,
"typing": typing,
},
key=member,
)
except Exception:
logger.exception("Error pushing typing notif to remotes")
@ -309,7 +309,9 @@ class TypingSlaveHandler(object):
# We must update this typing token from the response of the previous
# sync. In particular, the stream id may "reset" back to zero/a low
# value which we *must* use for the next replication request.
return {"typing": {"master": self._latest_room_serial}}
return {
"typing": {"synapse.app.client_reader": self._latest_room_serial}
} # FIXME
def process_replication_rows(self, stream_name, token, rows):
if stream_name != TypingStream.NAME:

View file

@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource):
self.register_servlets(hs)
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
if hs.config.worker_app is None:
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)
federation.register_servlets(hs, self)

View file

@ -128,15 +128,23 @@ class ReplicationEndpoint(object):
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
host = hs.config.worker_replication_host
port = hs.config.worker_replication_http_port
master_host = hs.config.worker_replication_host
master_port = hs.config.worker_replication_http_port
instance_http_map = hs.config.instance_http_map
client = hs.get_simple_http_client()
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
if instance_name != "master":
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_http_map:
host = instance_http_map[instance_name]["host"]
port = instance_http_map[instance_name]["port"]
else:
raise Exception("Unknown instance")
data = yield cls._serialize_payload(**kwargs)

View file

@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
def register_servlets(hs, http_server):
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
if hs.config.worker_app is None:
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)
ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)
ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)

View file

@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()
self.instance_name = hs.config.worker_name or "master"
@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}

View file

@ -63,6 +63,8 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
self.instance_name = hs.config.worker.worker_name or "master"
self.connections = [] # type: List[Any]
self.streams = {
@ -134,7 +136,9 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
self.send_command(PositionCommand(stream_name, "master", current_token))
self.send_command(
PositionCommand(stream_name, self.instance_name, current_token)
)
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@ -232,17 +236,17 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
current_token = (
self.replication_data_handler.get_streams_to_replicate()
.get(cmd.stream_name, {})
.get(cmd.instance_name)
current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
cmd.stream_name
)
if current_token is None:
if current_tokens is None:
logger.debug(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return
current_token = current_tokens.get(cmd.instance_name, 0)
# Fetch all updates between then and now.
limited = cmd.token != current_token
while limited:
@ -335,7 +339,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, "master", token, data))
self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
class ReplicationDataHandler:

View file

@ -74,9 +74,7 @@ class ReplicationStreamer(object):
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending
# hase been disabled on the master.
if stream == FederationStream:
continue
if stream == TypingStream:
@ -87,6 +85,9 @@ class ReplicationStreamer(object):
if hs.config.server.handle_typing:
self.streams.append(TypingStream(hs))
# We always add federation stream
self.streams.append(FederationStream(hs))
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)

View file

@ -256,7 +256,7 @@ class TypingStream(Stream):
self.current_token = typing_handler.get_current_token # type: ignore
if hs.config.worker_app is None:
if hs.config.handle_typing:
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
else:
# Query master process

View file

@ -15,8 +15,6 @@
# limitations under the License.
from collections import namedtuple
from twisted.internet import defer
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
@ -41,12 +39,8 @@ class FederationStream(Stream):
# Not all synapse instances will have a federation sender instance,
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
# so we stub the stream out when that is the case.
if hs.config.worker_app is None or hs.should_send_federation():
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token # type: ignore
self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
else:
self.current_token = lambda: 0 # type: ignore
self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token # type: ignore
self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
super(FederationStream, self).__init__(hs)

View file

@ -457,10 +457,8 @@ class HomeServer(object):
def build_federation_sender(self):
if self.should_send_federation():
return FederationSender(self)
elif not self.config.worker_app:
return FederationRemoteSendQueue(self)
else:
raise Exception("Workers cannot send federation traffic")
return FederationRemoteSendQueue(self)
def build_receipts_handler(self):
return ReceiptsHandler(self)