From ba1a8be9300595104c580e2c8e652ba2c58afff3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Mar 2020 16:13:12 +0000 Subject: [PATCH] Review comments --- docs/tcp_replication.md | 5 ++--- synapse/replication/http/streams.py | 15 ++++++++++++++- synapse/replication/tcp/commands.py | 4 ++-- synapse/replication/tcp/protocol.py | 3 +-- synapse/replication/tcp/streams/_base.py | 16 ++++++++-------- 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index 15a61f6fcf..5b26f70f88 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -174,9 +174,8 @@ client (C): #### POSITION (S) - The position of the stream has been updated. Sent to the client - after all missing updates for a stream have been sent to the client - and they're now up to date. + On receipt of a POSITION command clients should check if they have missed any + updates, and if so then fetch them out of band. #### ERROR (S, C) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index 3889278b2a..141df68787 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -25,6 +25,19 @@ logger = logging.getLogger(__name__) class ReplicationGetStreamUpdates(ReplicationEndpoint): """Fetches stream updates from a server. Used for streams not persisted to the database, e.g. typing notifications. + + The API looks like: + + GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100 + + 200 OK + + { + updates: [ ... ], + upto_token: 10, + limited: False, + } + """ NAME = "get_repl_stream_updates" @@ -32,7 +45,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): METHOD = "GET" def __init__(self, hs): - super(ReplicationGetStreamUpdates, self).__init__(hs) + super().__init__(hs) from synapse.replication.tcp.streams import STREAMS_MAP diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index b0f06c6d83..5a6b734094 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -136,8 +136,8 @@ class PositionCommand(Command): """Sent by the server to tell the client the stream postition without needing to send an RDATA. - Sent to the client after all missing updates for a stream have been sent - to the client and they're now up to date. + On receipt of a POSITION command clients should check if they have missed + any updates, and if so then fetch them out of band. """ NAME = "POSITION" diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 13e5fa9b12..8aa749265c 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -638,8 +638,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # We've now caught up to position sent to us, notify handler. await self.handler.on_position(cmd.stream_name, cmd.token) - # When we get a `POSITION` command it means we've finished getting - # missing updates for the given stream, and are now up to date. + # We're now up to date wit the stream self.streams_connecting.discard(cmd.stream_name) if not self.streams_connecting: self.handler.finished_connecting() diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 97af6bf9e1..d5b9c2831b 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -79,10 +79,10 @@ class Stream(object): since the stream was constructed if it hadn't been called before). Returns: - Resolves to a pair `(updates, new_last_token, limited)`, where - `updates` is a list of `(token, row)` entries, `new_last_token` is - the new position in stream, and `limited` is whether there are - more updates to fetch. + A triplet `(updates, new_last_token, limited)`, where `updates` is + a list of `(token, row)` entries, `new_last_token` is the new + position in stream, and `limited` is whether there are more updates + to fetch. """ current_token = self.current_token() updates, current_token, limited = await self.get_updates_since( @@ -99,10 +99,10 @@ class Stream(object): stream updates Returns: - Resolves to a pair `(updates, new_last_token, limited)`, where - `updates` is a list of `(token, row)` entries, `new_last_token` is - the new position in stream, and `limited` is whether there are - more updates to fetch. + A triplet `(updates, new_last_token, limited)`, where `updates` is + a list of `(token, row)` entries, `new_last_token` is the new + position in stream, and `limited` is whether there are more updates + to fetch. """ if from_token in ("NOW", "now"):