Initial TCP protocol implementation

This defines the low level TCP replication protocol
This commit is contained in:
Erik Johnston 2017-03-27 15:40:37 +01:00
parent 8da6f0be48
commit 7450693435
4 changed files with 1148 additions and 0 deletions

174
docs/tcp_replication.rst Normal file
View file

@ -0,0 +1,174 @@
TCP Replication
===============
This describes the TCP replication protocol that replaces the HTTP protocol.
Motivation
----------
The HTTP API used long poll from the workers to the master, this has the problem
of causing a lot of duplicate work on the server. This TCP protocol aims to
solve.
Overview
--------
The protocol is based on fire and forget, line based commands. An example flow
would be (where '>' indicates master->worker and '<' worker->master flows)::
> SERVER example.com
< REPLICATE events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
The example shows the server accepting a new connection and sending its identity
with the ``SERVER`` command, followed by the client asking to subscribe to the
``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
commands which have the format ``RDATA <stream_name> <token> <row>```, where the
format of ``<row>`` is defined by the individual streams.
Error reporting happens by either the client or server sending an `ERROR`
command, and usually the connection will be closed.
Since the protocol is a simple line based, its possible to manually connect to
the server using a tool like netcat. A few things should be noted when manually
using the protocol:
* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
be used to get all future updates. The special stream name ``ALL`` can be used
with ``NOW`` to subscribe to all available streams.
* The federation stream is only available if federation sending has been
disabled on the main process.
* The server will only time connections out that have sent a ``PING`` command.
If a ping is sent then the connection will be closed if no further commands
are receieved within 15s. Both the client and server protocol implementations
will send an initial PING on connection and ensure at least one command every
5s is sent (not necessarily ``PING``).
* ``RDATA`` commands *usually* include a numeric token, however if the stream
has multiple rows to replicate per token the server will send multiple
``RDATA`` commands, with all but the last having a token of ``batch``. See
the documentation on ``commands.RdataCommand`` for further details.
Architecture
------------
The basic structure of the protocol is line based, where the initial word of
each line specifies the command. The rest of the line is parsed based on the
command. For example, the `RDATA` command is defined as::
RDATA <stream_name> <token> <row_json>
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
Blank lines are ignored.
Keep alives
~~~~~~~~~~~
Both sides are expected to send at least one command every 5s or so, and
should send a ``PING`` command if necessary. If either side do not receive a
command within e.g. 15s then the connection should be closed.
Because the server may be connected to manually using e.g. netcat, the timeouts
aren't enabled until an initial ``PING`` command is seen. Both the client and
server implementations below send a ``PING`` command immediately on connection to
ensure the timeouts are enabled.
This ensures that both sides can quickly realize if the tcp connection has gone
and handle the situation appropriately.
Start up
~~~~~~~~
When a new connection is made, the server:
* Sends a ``SERVER`` command, which includes the identity of the server, allowing
the client to detect if its connected to the expected server
* Sends a ``PING`` command as above, to enable the client to time out connections
promptly.
The client:
* Sends a ``NAME`` command, allowing the server to associate a human friendly
name with the connection. This is optional.
* Sends a ``PING`` as above
* For each stream the client wishes to subscribe to it sends a ``REPLICATE``
with the stream_name and token it wants to subscribe from.
* On receipt of a ``SERVER`` command, checks that the server name matches the
expected server name.
Error handling
~~~~~~~~~~~~~~
If either side detects an error it can send an ``ERROR`` command and close the
connection.
If the client side loses the connection to the server it should reconnect,
following the steps above.
Congestion
~~~~~~~~~~
If the server sends messages faster than the client can consume them the server
will first buffer a (fairly large) number of commands and then disconnect the
client. This ensures that we don't queue up an unbounded number of commands in
memory and gives us a potential oppurtunity to squawk loudly. When/if the client
recovers it can reconnect to the server and ask for missed messages.
Reliability
~~~~~~~~~~~
In general the replication stream should be consisdered an unreliable transport
since e.g. commands are not resent if the connection disappears.
The exception to that are the replication streams, i.e. RDATA commands, since
these include tokens which can be used to restart the stream on connection
errors.
The client should keep track of the token in the last RDATA command received
for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details.
Example
~~~~~~~
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
indicate which side is sending, these are *not* included on the wire::
* connection established *
> SERVER localhost:8823
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
The ``POSITION`` command sent by the server is used to set the clients position
without needing to send data with the ``RDATA`` command.
An example of a batched set of ``RDATA`` is::
> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
In this case the client shouldn't advance their caches token until it sees the
the last ``RDATA``.

View file

@ -12,3 +12,35 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module implements the TCP replication protocol used by synapse to
communicate between the master process and its workers (when they're enabled).
The protocol is based on fire and forget, line based commands. An example flow
would be (where '>' indicates master->worker and '<' worker->master flows)::
> SERVER example.com
< REPLICATE events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client asking to subscribe to the
`events` stream from the token `53`. The server then periodically sends `RDATA`
commands which have the format `RDATA <stream_name> <token> <row>`, where the
format of `<row>` is defined by the individual streams.
Error reporting happens by either the client or server sending an `ERROR`
command, and usually the connection will be closed.
Structure of the module:
* client.py - the client classes used for workers to connect to master
* command.py - the definitions of all the valid commands
* protocol.py - contains bot the client and server protocol implementations,
these should not be used directly
* resource.py - the server classes that accepts and handle client connections
* streams.py - the definitons of all the valid streams
Further details can be found in docs/tcp_replication.rst
"""

View file

@ -0,0 +1,341 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines the various valid commands
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import logging
import ujson as json
logger = logging.getLogger(__name__)
class Command(object):
"""The base command class.
All subclasses must set the NAME variable which equates to the name of the
command on the wire.
A full command line on the wire is constructed from `NAME + " " + to_line()`
The default implementation creates a command of form `<NAME> <data>`
"""
NAME = None
def __init__(self, data):
self.data = data
@classmethod
def from_line(cls, line):
"""Deserialises a line from the wire into this command. `line` does not
include the command.
"""
return cls(line)
def to_line(self):
"""Serialises the comamnd for the wire. Does not include the command
prefix.
"""
return self.data
class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
Format::
SERVER <server_name>
"""
NAME = "SERVER"
class RdataCommand(Command):
"""Sent by server when a subscribed stream has an update.
Format::
RDATA <stream_name> <token> <row_json>
The `<token>` may either be a numeric stream id OR "batch". The latter case
is used to support sending multiple updates with the same stream ID. This
is done by sending an RDATA for each row, with all but the last RDATA having
a token of "batch" and the last having the final stream ID.
The client should batch all incoming RDATA with a token of "batch" (per
stream_name) until it sees an RDATA with a numeric stream ID.
`<token>` of "batch" maps to the instance variable `token` being None.
An example of a batched series of RDATA::
RDATA presence batch ["@foo:example.com", "online", ...]
RDATA presence batch ["@bar:example.com", "online", ...]
RDATA presence 59 ["@baz:example.com", "online", ...]
"""
NAME = "RDATA"
def __init__(self, stream_name, token, row):
self.stream_name = stream_name
self.token = token
self.row = row
@classmethod
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
return cls(
stream_name,
None if token == "batch" else int(token),
json.loads(row_json)
)
def to_line(self):
return " ".join((
self.stream_name,
str(self.token) if self.token is not None else "batch",
json.dumps(self.row),
))
class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
needing to send an RDATA.
"""
NAME = "POSITION"
def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token
@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
return cls(stream_name, int(token))
def to_line(self):
return " ".join((self.stream_name, str(self.token),))
class ErrorCommand(Command):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
"""
NAME = "ERROR"
class PingCommand(Command):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""
NAME = "PING"
class NameCommand(Command):
"""Sent by client to inform the server of the client's identity. The data
is the name
"""
NAME = "NAME"
class ReplicateCommand(Command):
"""Sent by the client to subscribe to the stream.
Format::
REPLICATE <stream_name> <token>
Where <token> may be either:
* a numeric stream_id to stream updates from
* "NOW" to stream all subsequent updates.
The <stream_name> can be "ALL" to subscribe to all known streams, in which
case the <token> must be set to "NOW", i.e.::
REPLICATE ALL NOW
"""
NAME = "REPLICATE"
def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token
@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
if token in ("NOW", "now"):
token = "NOW"
else:
token = int(token)
return cls(stream_name, token)
def to_line(self):
return " ".join((self.stream_name, str(self.token),))
class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
stopped syncing. Used to calculate presence on the master.
Format::
USER_SYNC <user_id> <state>
Where <state> is either "start" or "stop"
"""
NAME = "USER_SYNC"
def __init__(self, user_id, is_syncing):
self.user_id = user_id
self.is_syncing = is_syncing
@classmethod
def from_line(cls, line):
user_id, state = line.split(" ", 1)
if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))
return cls(user_id, state == "start")
def to_line(self):
return " ".join((self.user_id, "start" if self.is_syncing else "end"))
class FederationAckCommand(Command):
"""Sent by the client when it has processed up to a given point in the
federation stream. This allows the master to drop in-memory caches of the
federation stream.
This must only be sent from one worker (i.e. the one sending federation)
Format::
FEDERATION_ACK <token>
"""
NAME = "FEDERATION_ACK"
def __init__(self, token):
self.token = token
@classmethod
def from_line(cls, line):
return cls(int(line))
def to_line(self):
return str(self.token)
class SyncCommand(Command):
"""Used for testing. The client protocol implementation allows waiting
on a SYNC command with a specified data.
"""
NAME = "SYNC"
class RemovePusherCommand(Command):
"""Sent by the client to request the master remove the given pusher.
Format::
REMOVE_PUSHER <app_id> <push_key> <user_id>
"""
NAME = "REMOVE_PUSHER"
def __init__(self, app_id, push_key, user_id):
self.user_id = user_id
self.app_id = app_id
self.push_key = push_key
@classmethod
def from_line(cls, line):
app_id, push_key, user_id = line.split(" ", 2)
return cls(app_id, push_key, user_id)
def to_line(self):
return " ".join((self.app_id, self.push_key, self.user_id))
class InvalidateCacheCommand(Command):
"""Sent by the client to invalidate an upstream cache.
THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
NOT DISASTROUS IF WE DROP ON THE FLOOR.
Mainly used to invalidate destination retry timing caches.
Format::
INVALIDATE_CACHE <cache_func> <keys_json>
Where <keys_json> is a json list.
"""
NAME = "INVALIDATE_CACHE"
def __init__(self, cache_func, keys):
self.cache_func = cache_func
self.keys = keys
@classmethod
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
return cls(cache_func, json.loads(keys_json))
def to_line(self):
return " ".join((self.cache_func, json.dumps(self.keys)))
# Map of command name to command type.
COMMAND_MAP = {
cmd.NAME: cmd
for cmd in (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
)
}
# The commands the server is allowed to send
VALID_SERVER_COMMANDS = (
ServerCommand.NAME,
RdataCommand.NAME,
PositionCommand.NAME,
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
)
# The commands the client is allowed to send
VALID_CLIENT_COMMANDS = (
NameCommand.NAME,
ReplicateCommand.NAME,
PingCommand.NAME,
UserSyncCommand.NAME,
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME,
ErrorCommand.NAME,
)

View file

@ -0,0 +1,601 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains the implementation of both the client and server
protocols.
The basic structure of the protocol is line based, where the initial word of
each line specifies the command. The rest of the line is parsed based on the
command. For example, the `RDATA` command is defined as::
RDATA <stream_name> <token> <row_json>
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
Blank lines are ignored.
# Example
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
indicate which side is sending, these are *not* included on the wire::
* connection established *
> SERVER localhost:8823
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
"""
from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
from commands import (
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
)
from streams import STREAMS_MAP
from synapse.util.stringutils import random_string
import logging
import synapse.metrics
import struct
import fcntl
metrics = synapse.metrics.get_metrics_for(__name__)
inbound_commands_counter = metrics.register_counter(
"inbound_commands", labels=["command", "name", "conn_id"],
)
outbound_commands_counter = metrics.register_counter(
"outbound_commands", labels=["command", "name", "conn_id"],
)
# A list of all connected protocols. This allows us to send metrics about the
# connections.
connected_connections = []
logger = logging.getLogger(__name__)
PING_TIME = 5000
class ConnectionStates(object):
CONNECTING = "connecting"
ESTABLISHED = "established"
PAUSED = "paused"
CLOSED = "closed"
class BaseReplicationStreamProtocol(LineOnlyReceiver):
"""Base replication protocol shared between client and server.
Reads lines (ignoring blank ones) and parses them into command classes,
asserting that they are valid for the given direction, i.e. server commands
are only sent by the server.
On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
command.
It also sends `PING` periodically, and correctly times out remote connections
(if they send a `PING` command)
"""
delimiter = b'\n'
VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
max_line_buffer = 10000
def __init__(self, clock):
self.clock = clock
self.last_received_command = self.clock.time_msec()
self.last_sent_command = 0
self.time_we_closed = None # When we requested the connection be closed
self.received_ping = False # Have we reecived a ping from the other side
self.state = ConnectionStates.CONNECTING
self.name = "anon" # The name sent by a client.
self.conn_id = random_string(5) # To dedupe in case of name clashes.
# List of pending commands to send once we've established the connection
self.pending_commands = []
# The LoopingCall for sending pings.
self._send_ping_loop = None
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
self.state = ConnectionStates.ESTABLISHED
connected_connections.append(self) # Register connection for metrics
self.transport.registerProducer(self, True) # For the *Producing callbacks
self._send_pending_commands()
# Starts sending pings
self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000)
# Always send the initial PING so that the other side knows that they
# can time us out.
self.send_command(PingCommand(self.clock.time_msec()))
def send_ping(self):
"""Periodically sends a ping and checks if we should close the connection
due to the other side timing out.
"""
now = self.clock.time_msec()
if self.time_we_closed:
if now - self.time_we_closed > PING_TIME * 3:
logger.info(
"[%s] Failed to close connection gracefully, aborting", self.id()
)
self.transport.abortConnection()
else:
if now - self.last_sent_command >= PING_TIME:
self.send_command(PingCommand(now))
if self.received_ping and now - self.last_received_command > PING_TIME * 3:
logger.info(
"[%s] Connection hasn't received command in %r ms. Closing.",
self.id(), now - self.last_received_command
)
self.send_error("ping timeout")
def lineReceived(self, line):
"""Called when we've received a line
"""
if line.strip() == "":
# Ignore blank lines
return
line = line.decode("utf-8")
cmd_name, rest_of_line = line.split(" ", 1)
if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
self.send_error("invalid command: %s", cmd_name)
return
self.last_received_command = self.clock.time_msec()
inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return
# Now lets try and call on_<CMD_NAME> function
try:
getattr(self, "on_%s" % (cmd_name,))(cmd)
except Exception:
logger.exception("[%s] Failed to handle line: %r", self.id(), line)
def close(self):
self.time_we_closed = self.clock.time_msec()
self.transport.loseConnection()
self.on_connection_closed()
def send_error(self, error_string, *args):
"""Send an error to remote and close the connection.
"""
self.send_command(ErrorCommand(error_string % args))
self.close()
def send_command(self, cmd, do_buffer=True):
"""Send a command if connection has been established.
Args:
cmd (Command)
do_buffer (bool): Whether to buffer the message or always attempt
to send the command. This is mostly used to send an error
message if we're about to close the connection due our buffers
becoming full.
"""
if self.state == ConnectionStates.CLOSED:
logger.info("[%s] Not sending, connection closed", self.id())
return
if do_buffer and self.state != ConnectionStates.ESTABLISHED:
self._queue_command(cmd)
return
outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
self.sendLine(string.encode("utf-8"))
self.last_sent_command = self.clock.time_msec()
def _queue_command(self, cmd):
"""Queue the command until the connection is ready to write to again.
"""
logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
self.pending_commands.append(cmd)
if len(self.pending_commands) > self.max_line_buffer:
# The other side is failing to keep up and out buffers are becoming
# full, so lets close the connection.
# XXX: should we squawk more loudly?
logger.error("[%s] Remote failed to keep up", self.id())
self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False)
self.close()
def _send_pending_commands(self):
"""Send any queued commandes
"""
pending = self.pending_commands
self.pending_commands = []
for cmd in pending:
self.send_command(cmd)
def on_PING(self, line):
self.received_ping = True
def on_ERROR(self, cmd):
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
def pauseProducing(self):
"""This is called when both the kernel send buffer and the twisted
tcp connection send buffers have become full.
We don't actually have any control over those sizes, so we buffer some
commands ourselves before knifing the connection due to the remote
failing to keep up.
"""
logger.info("[%s] Pause producing", self.id())
self.state = ConnectionStates.PAUSED
def resumeProducing(self):
"""The remote has caught up after we started buffering!
"""
logger.info("[%s] Resume producing", self.id())
self.state = ConnectionStates.ESTABLISHED
self._send_pending_commands()
def stopProducing(self):
"""We're never going to send any more data (normally because either
we or the remote has closed the connection)
"""
logger.info("[%s] Stop producing", self.id())
self.on_connection_closed()
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
try:
# Remove us from list of connections to be monitored
connected_connections.remove(self)
except ValueError:
pass
# Stop the looping call sending pings.
if self._send_ping_loop and self._send_ping_loop.running:
self._send_ping_loop.stop()
self.on_connection_closed()
def on_connection_closed(self):
logger.info("[%s] Connection was closed", self.id())
self.state = ConnectionStates.CLOSED
self.pending_commands = []
if self.transport:
self.transport.unregisterProducer()
def __str__(self):
return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
self.name, self.conn_id, self.addr,
)
def id(self):
return "%s-%s" % (self.name, self.conn_id)
class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
def __init__(self, server_name, clock, streamer, addr):
BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
self.server_name = server_name
self.streamer = streamer
self.addr = addr
# The streams the client has subscribed to and is up to date with
self.replication_streams = set()
# The streams the client is currently subscribing to.
self.connecting_streams = set()
# Map from stream name to list of updates to send once we've finished
# subscribing the client to the stream.
self.pending_rdata = {}
def connectionMade(self):
self.send_command(ServerCommand(self.server_name))
BaseReplicationStreamProtocol.connectionMade(self)
self.streamer.new_connection(self)
def on_NAME(self, cmd):
self.name = cmd.data
def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing)
def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name
token = cmd.token
if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
for stream in self.streamer.streams_by_name.iterkeys():
self.subscribe_to_stream(stream, token)
else:
self.subscribe_to_stream(stream_name, token)
def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
def on_REMOVE_PUSHER(self, cmd):
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
def onINVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams.
This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream
are queued and sent once we've sent down any missed updates.
"""
self.replication_streams.discard(stream_name)
self.connecting_streams.add(stream_name)
try:
# Get missing updates
updates, current_token = yield self.streamer.get_stream_updates(
stream_name, token,
)
# Send all the missing updates
for update in updates:
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))
# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
for token, update in pending_rdata:
self.send_command(RdataCommand(stream_name, token, update))
# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates
# above)
self.send_command(PositionCommand(stream_name, current_token))
# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
logger.exception("[%s] Failed to handle REPLICATE command", self.id())
self.send_error("failed to handle replicate: %r", e)
finally:
self.connecting_streams.discard(stream_name)
def stream_update(self, stream_name, token, data):
"""Called when a new update is available to stream to clients.
We need to check if the client is interested in the stream or not
"""
if stream_name in self.replication_streams:
# The client is subscribed to the stream
self.send_command(RdataCommand(stream_name, token, data))
elif stream_name in self.connecting_streams:
# The client is being subscribed to the stream
logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
self.pending_rdata.setdefault(stream_name, []).append((token, data))
else:
# The client isn't subscribed
logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token)
def send_sync(self, data):
self.send_command(SyncCommand(data))
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
logger.info("[%s] Replication connection closed", self.id())
self.streamer.lost_connection(self)
class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
def __init__(self, client_name, server_name, clock, handler):
BaseReplicationStreamProtocol.__init__(self, clock)
self.client_name = client_name
self.server_name = server_name
self.handler = handler
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
def connectionMade(self):
self.send_command(NameCommand(self.client_name))
BaseReplicationStreamProtocol.connectionMade(self)
# Once we've connected subscribe to the necessary streams
for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
self.replicate(stream_name, token)
# Tell the server if we have any users currently syncing (should only
# happen on synchrotrons)
currently_syncing = self.handler.get_currently_syncing_users()
for user_id in currently_syncing:
self.send_command(UserSyncCommand(user_id, True))
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
self.transport.abortConnection()
def on_RDATA(self, cmd):
try:
row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
self.id(), cmd.stream_name, cmd.row
)
raise
if cmd.token is None:
# I.e. this is part of a batch of updates for this stream. Batch
# until we get an update for the stream with a non None token
self.pending_batches.setdefault(cmd.stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(cmd.stream_name, [])
rows.append(row)
self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
def replicate(self, stream_name, token):
"""Send the subscription request to the server
"""
if stream_name not in STREAMS_MAP:
raise Exception("Invalid stream name %r" % (stream_name,))
logger.info(
"[%s] Subscribing to replication stream: %r from %r",
self.id(), stream_name, token
)
self.send_command(ReplicateCommand(stream_name, token))
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.handler.update_connection(None)
# The following simply registers metrics for the replication connections
metrics.register_callback(
"pending_commands",
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
},
labels=["name", "conn_id"],
)
def transport_buffer_size(protocol):
if protocol.transport:
size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen
return size
return 0
metrics.register_callback(
"transport_send_buffer",
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
},
labels=["name", "conn_id"],
)
def transport_kernel_read_buffer_size(protocol, read=True):
SIOCINQ = 0x541B
SIOCOUTQ = 0x5411
if protocol.transport:
fileno = protocol.transport.getHandle().fileno()
if read:
op = SIOCINQ
else:
op = SIOCOUTQ
size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0]
return size
return 0
metrics.register_callback(
"transport_kernel_send_buffer",
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
labels=["name", "conn_id"],
)
metrics.register_callback(
"transport_kernel_read_buffer",
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
labels=["name", "conn_id"],
)