Add redis support

This commit is contained in:
Erik Johnston 2020-03-23 11:32:02 +00:00
parent 11fb08ffa9
commit 55dfcd2f09
7 changed files with 234 additions and 4 deletions

View file

@ -75,3 +75,6 @@ ignore_missing_imports = True
[mypy-jwt.*] [mypy-jwt.*]
ignore_missing_imports = True ignore_missing_imports = True
[mypy-txredisapi]
ignore_missing_imports = True

View file

@ -599,10 +599,19 @@ class GenericWorkerServer(HomeServer):
else: else:
logger.warning("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
factory = ReplicationClientFactory(self, self.config.worker_name) if self.config.redis.redis_enabled:
host = self.config.worker_replication_host from synapse.replication.tcp.redis import RedisFactory
port = self.config.worker_replication_port
self.get_reactor().connectTCP(host, port, factory) logger.info("Connecting to redis.")
factory = RedisFactory(self)
self.get_reactor().connectTCP(
self.config.redis.redis_host, self.config.redis.redis_port, factory
)
else:
factory = ReplicationClientFactory(self, self.config.worker_name)
host = self.config.worker_replication_host
port = self.config.worker_replication_port
self.get_reactor().connectTCP(host, port, factory)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

View file

@ -263,6 +263,15 @@ class SynapseHomeServer(HomeServer):
def start_listening(self, listeners): def start_listening(self, listeners):
config = self.get_config() config = self.get_config()
if config.redis_enabled:
from synapse.replication.tcp.redis import RedisFactory
logger.info("Connecting to redis.")
factory = RedisFactory(self)
self.get_reactor().connectTCP(
self.config.redis.redis_host, self.config.redis.redis_port, factory
)
for listener in listeners: for listener in listeners:
if listener["type"] == "http": if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener)) self._listening_services.extend(self._listener_http(config, listener))
@ -282,6 +291,7 @@ class SynapseHomeServer(HomeServer):
) )
for s in services: for s in services:
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warning( logger.warning(

View file

@ -31,6 +31,7 @@ from .password import PasswordConfig
from .password_auth_providers import PasswordAuthProviderConfig from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig from .push import PushConfig
from .ratelimiting import RatelimitConfig from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig from .room_directory import RoomDirectoryConfig
@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig, RoomDirectoryConfig,
ThirdPartyRulesConfig, ThirdPartyRulesConfig,
TracerConfig, TracerConfig,
RedisConfig,
] ]

47
synapse/config/redis.py Normal file
View file

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config, ConfigError
try:
import txredisapi
except ImportError:
txredisapi = None
MISSING_REDIS = """Missing 'txredisapi' library. This is required for redis support.
Install by running:
pip install txredisapi
"""
class RedisConfig(Config):
section = "redis"
def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)
if not self.redis_enabled:
return
if txredisapi is None:
raise ConfigError(MISSING_REDIS)
self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")

View file

@ -98,6 +98,7 @@ CONDITIONAL_REQUIREMENTS = {
"sentry": ["sentry-sdk>=0.7.2"], "sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"], "jwt": ["pyjwt>=1.6.4"],
"redis": ["txredisapi>=1.4.7"],
} }
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str] ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

View file

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import txredisapi
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
COMMAND_MAP,
Command,
RdataCommand,
ReplicateCommand,
)
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
class RedisSubscriber(txredisapi.SubscriberProtocol):
"""Connection to redis subscribed to replication stream.
"""
def connectionMade(self):
logger.info("Connected to redis instance")
self.subscribe(self.stream_name)
self.send_command(ReplicateCommand())
self.handler.new_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str):
"""Received a message from redis.
"""
if message.strip() == "":
# Ignore blank lines
return
line = message
cmd_name, rest_of_line = line.split(" ", 1)
cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.
By default delegates to on_<COMMAND>, which should return an awaitable.
Args:
cmd: received command
"""
handled = False
# First call any command handlers on this instance. These are for redis
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True
# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True
if not handled:
logger.warning("Unhandled command: %r", cmd)
def connectionLost(self, reason):
logger.info("Lost connection to redis instance")
self.handler.lost_connection(self)
def send_command(self, cmd):
"""Send a command if connection has been established.
Args:
cmd (Command)
"""
string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
encoded_string = string.encode("utf-8")
async def _send():
with PreserveLoggingContext():
await self.redis_connection.publish(self.stream_name, encoded_string)
run_as_background_process("send-cmd", _send)
def stream_update(self, stream_name, token, data):
"""Called when a new update is available to stream to clients.
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
class RedisFactory(txredisapi.SubscriberFactory):
maxDelay = 5
continueTrying = True
protocol = RedisSubscriber
def __init__(self, hs):
super(RedisFactory, self).__init__()
self.password = hs.config.redis.redis_password
self.handler = hs.get_tcp_replication()
self.stream_name = hs.hostname
self.redis_connection = txredisapi.lazyConnection(
host=hs.config.redis_host,
port=hs.config.redis_port,
dbid=hs.config.redis_dbid,
password=hs.config.redis.redis_password,
reconnect=True,
)
self.conn_id = random_string(5)
def buildProtocol(self, addr):
p = super(RedisFactory, self).buildProtocol(addr)
p.handler = self.handler
p.redis_connection = self.redis_connection
p.conn_id = self.conn_id
p.stream_name = self.stream_name
return p