mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 15:39:00 +03:00
Make workers report to master for user ip updates
This commit is contained in:
parent
a0a561ae85
commit
78cefd78d6
9 changed files with 111 additions and 8 deletions
|
@ -24,6 +24,7 @@ from synapse.http.server import JsonResource
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||||
from synapse.replication.slave.storage.room import RoomStore
|
from synapse.replication.slave.storage.room import RoomStore
|
||||||
|
@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.client_ips import ClientIpStore
|
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||||
|
@ -65,8 +65,8 @@ class ClientReaderSlavedStore(
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
TransactionStore,
|
TransactionStore,
|
||||||
|
SlavedClientIpStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||||
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
|
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.client_ips import ClientIpStore
|
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.media_repository import MediaRepositoryStore
|
from synapse.storage.media_repository import MediaRepositoryStore
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
|
@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository")
|
||||||
class MediaRepositorySlavedStore(
|
class MediaRepositorySlavedStore(
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
|
SlavedClientIpStore,
|
||||||
TransactionStore,
|
TransactionStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
MediaRepositoryStore,
|
MediaRepositoryStore,
|
||||||
ClientIpStore,
|
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events
|
||||||
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
||||||
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
|
@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||||
from synapse.replication.slave.storage.room import RoomStore
|
from synapse.replication.slave.storage.room import RoomStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.client_ips import ClientIpStore
|
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.presence import UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
from synapse.storage.roommember import RoomMemberStore
|
||||||
|
@ -77,9 +77,9 @@ class SynchrotronSlavedStore(
|
||||||
SlavedPresenceStore,
|
SlavedPresenceStore,
|
||||||
SlavedDeviceInboxStore,
|
SlavedDeviceInboxStore,
|
||||||
SlavedDeviceStore,
|
SlavedDeviceStore,
|
||||||
|
SlavedClientIpStore,
|
||||||
RoomStore,
|
RoomStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
|
||||||
):
|
):
|
||||||
who_forgot_in_room = (
|
who_forgot_in_room = (
|
||||||
RoomMemberStore.__dict__["who_forgot_in_room"]
|
RoomMemberStore.__dict__["who_forgot_in_room"]
|
||||||
|
|
|
@ -26,12 +26,12 @@ from synapse.http.server import JsonResource
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
from synapse.rest.client.v2_alpha import user_directory
|
from synapse.rest.client.v2_alpha import user_directory
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.client_ips import ClientIpStore
|
|
||||||
from synapse.storage.user_directory import UserDirectoryStore
|
from synapse.storage.user_directory import UserDirectoryStore
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
|
||||||
|
@ -58,9 +58,9 @@ class UserDirectorySlaveStore(
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
|
SlavedClientIpStore,
|
||||||
UserDirectoryStore,
|
UserDirectoryStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
|
||||||
):
|
):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
|
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
|
||||||
|
|
48
synapse/replication/slave/storage/client_ips.py
Normal file
48
synapse/replication/slave/storage/client_ips.py
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2017 Vector Creations Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import BaseSlavedStore
|
||||||
|
from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
|
||||||
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
from synapse.util.caches.descriptors import Cache
|
||||||
|
|
||||||
|
|
||||||
|
class SlavedClientIpStore(BaseSlavedStore):
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(SlavedClientIpStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
self.client_ip_last_seen = Cache(
|
||||||
|
name="client_ip_last_seen",
|
||||||
|
keylen=4,
|
||||||
|
max_entries=50000 * CACHE_SIZE_FACTOR,
|
||||||
|
)
|
||||||
|
|
||||||
|
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
|
||||||
|
now = int(self._clock.time_msec())
|
||||||
|
user_id = user.to_string()
|
||||||
|
key = (user_id, access_token, ip)
|
||||||
|
|
||||||
|
try:
|
||||||
|
last_seen = self.client_ip_last_seen.get(key)
|
||||||
|
except KeyError:
|
||||||
|
last_seen = None
|
||||||
|
|
||||||
|
# Rate-limited inserts
|
||||||
|
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.hs.get_tcp_replication().send_user_ip(
|
||||||
|
user_id, access_token, ip, user_agent, device_id, now
|
||||||
|
)
|
|
@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
|
||||||
|
|
||||||
from .commands import (
|
from .commands import (
|
||||||
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
|
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
|
||||||
|
UserIpCommand,
|
||||||
)
|
)
|
||||||
from .protocol import ClientReplicationStreamProtocol
|
from .protocol import ClientReplicationStreamProtocol
|
||||||
|
|
||||||
|
@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
|
||||||
cmd = InvalidateCacheCommand(cache_func.__name__, keys)
|
cmd = InvalidateCacheCommand(cache_func.__name__, keys)
|
||||||
self.send_command(cmd)
|
self.send_command(cmd)
|
||||||
|
|
||||||
|
def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||||
|
"""Tell the master that the user made a request.
|
||||||
|
"""
|
||||||
|
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
|
||||||
|
self.send_command(cmd)
|
||||||
|
|
||||||
def await_sync(self, data):
|
def await_sync(self, data):
|
||||||
"""Returns a deferred that is resolved when we receive a SYNC command
|
"""Returns a deferred that is resolved when we receive a SYNC command
|
||||||
with given data.
|
with given data.
|
||||||
|
|
|
@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
|
||||||
return " ".join((self.cache_func, json.dumps(self.keys)))
|
return " ".join((self.cache_func, json.dumps(self.keys)))
|
||||||
|
|
||||||
|
|
||||||
|
class UserIpCommand(Command):
|
||||||
|
"""Sent periodically when a worker sees activity from a client.
|
||||||
|
|
||||||
|
Format::
|
||||||
|
|
||||||
|
USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
|
||||||
|
"""
|
||||||
|
NAME = "USER_IP"
|
||||||
|
|
||||||
|
def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||||
|
self.user_id = user_id
|
||||||
|
self.access_token = access_token
|
||||||
|
self.ip = ip
|
||||||
|
self.user_agent = user_agent
|
||||||
|
self.device_id = device_id
|
||||||
|
self.last_seen = last_seen
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_line(cls, line):
|
||||||
|
user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
|
||||||
|
|
||||||
|
return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
|
||||||
|
|
||||||
|
def to_line(self):
|
||||||
|
return " ".join((
|
||||||
|
self.user_id, self.access_token, self.ip, self.device_id,
|
||||||
|
str(self.last_seen), self.user_agent,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
# Map of command name to command type.
|
# Map of command name to command type.
|
||||||
COMMAND_MAP = {
|
COMMAND_MAP = {
|
||||||
cmd.NAME: cmd
|
cmd.NAME: cmd
|
||||||
|
@ -320,6 +350,7 @@ COMMAND_MAP = {
|
||||||
SyncCommand,
|
SyncCommand,
|
||||||
RemovePusherCommand,
|
RemovePusherCommand,
|
||||||
InvalidateCacheCommand,
|
InvalidateCacheCommand,
|
||||||
|
UserIpCommand,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
|
||||||
FederationAckCommand.NAME,
|
FederationAckCommand.NAME,
|
||||||
RemovePusherCommand.NAME,
|
RemovePusherCommand.NAME,
|
||||||
InvalidateCacheCommand.NAME,
|
InvalidateCacheCommand.NAME,
|
||||||
|
UserIpCommand.NAME,
|
||||||
ErrorCommand.NAME,
|
ErrorCommand.NAME,
|
||||||
)
|
)
|
||||||
|
|
|
@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
def on_INVALIDATE_CACHE(self, cmd):
|
def on_INVALIDATE_CACHE(self, cmd):
|
||||||
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
||||||
|
|
||||||
|
def on_USER_IP(self, cmd):
|
||||||
|
self.streamer.on_user_ip(
|
||||||
|
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
|
||||||
|
cmd.last_seen,
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def subscribe_to_stream(self, stream_name, token):
|
def subscribe_to_stream(self, stream_name, token):
|
||||||
"""Subscribe the remote to a streams.
|
"""Subscribe the remote to a streams.
|
||||||
|
|
|
@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
|
||||||
federation_ack_counter = metrics.register_counter("federation_ack")
|
federation_ack_counter = metrics.register_counter("federation_ack")
|
||||||
remove_pusher_counter = metrics.register_counter("remove_pusher")
|
remove_pusher_counter = metrics.register_counter("remove_pusher")
|
||||||
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
|
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
|
||||||
|
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -238,6 +239,15 @@ class ReplicationStreamer(object):
|
||||||
invalidate_cache_counter.inc()
|
invalidate_cache_counter.inc()
|
||||||
getattr(self.store, cache_func).invalidate(tuple(keys))
|
getattr(self.store, cache_func).invalidate(tuple(keys))
|
||||||
|
|
||||||
|
@measure_func("repl.on_user_ip")
|
||||||
|
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||||
|
"""The client saw a user request
|
||||||
|
"""
|
||||||
|
user_ip_cache_counter.inc()
|
||||||
|
self.store.insert_client_ip(
|
||||||
|
user_id, access_token, ip, user_agent, device_id, last_seen,
|
||||||
|
)
|
||||||
|
|
||||||
def send_sync_to_all_connections(self, data):
|
def send_sync_to_all_connections(self, data):
|
||||||
"""Sends a SYNC command to all clients.
|
"""Sends a SYNC command to all clients.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue