mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 18:15:53 +03:00
Split DeviceHandler into master and worker
This commit is contained in:
parent
a84b8d56c2
commit
157e5a8f27
2 changed files with 179 additions and 170 deletions
|
@ -37,13 +37,185 @@ from ._base import BaseHandler
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DeviceHandler(BaseHandler):
|
class DeviceWorkerHandler(BaseHandler):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(DeviceHandler, self).__init__(hs)
|
super(DeviceWorkerHandler, self).__init__(hs)
|
||||||
|
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self._auth_handler = hs.get_auth_handler()
|
self._auth_handler = hs.get_auth_handler()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_devices_by_user(self, user_id):
|
||||||
|
"""
|
||||||
|
Retrieve the given user's devices
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str):
|
||||||
|
Returns:
|
||||||
|
defer.Deferred: list[dict[str, X]]: info on each device
|
||||||
|
"""
|
||||||
|
|
||||||
|
device_map = yield self.store.get_devices_by_user(user_id)
|
||||||
|
|
||||||
|
ips = yield self.store.get_last_client_ip_by_device(
|
||||||
|
user_id, device_id=None
|
||||||
|
)
|
||||||
|
|
||||||
|
devices = list(device_map.values())
|
||||||
|
for device in devices:
|
||||||
|
_update_device_from_client_ips(device, ips)
|
||||||
|
|
||||||
|
defer.returnValue(devices)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_device(self, user_id, device_id):
|
||||||
|
""" Retrieve the given device
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str):
|
||||||
|
device_id (str):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
defer.Deferred: dict[str, X]: info on the device
|
||||||
|
Raises:
|
||||||
|
errors.NotFoundError: if the device was not found
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
device = yield self.store.get_device(user_id, device_id)
|
||||||
|
except errors.StoreError:
|
||||||
|
raise errors.NotFoundError
|
||||||
|
ips = yield self.store.get_last_client_ip_by_device(
|
||||||
|
user_id, device_id,
|
||||||
|
)
|
||||||
|
_update_device_from_client_ips(device, ips)
|
||||||
|
defer.returnValue(device)
|
||||||
|
|
||||||
|
@measure_func("device.get_user_ids_changed")
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_user_ids_changed(self, user_id, from_token):
|
||||||
|
"""Get list of users that have had the devices updated, or have newly
|
||||||
|
joined a room, that `user_id` may be interested in.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str)
|
||||||
|
from_token (StreamToken)
|
||||||
|
"""
|
||||||
|
now_token = yield self.hs.get_event_sources().get_current_token()
|
||||||
|
|
||||||
|
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
|
||||||
|
# First we check if any devices have changed
|
||||||
|
changed = yield self.store.get_user_whose_devices_changed(
|
||||||
|
from_token.device_list_key
|
||||||
|
)
|
||||||
|
|
||||||
|
# Then work out if any users have since joined
|
||||||
|
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
||||||
|
|
||||||
|
member_events = yield self.store.get_membership_changes_for_user(
|
||||||
|
user_id, from_token.room_key, now_token.room_key,
|
||||||
|
)
|
||||||
|
rooms_changed.update(event.room_id for event in member_events)
|
||||||
|
|
||||||
|
stream_ordering = RoomStreamToken.parse_stream_token(
|
||||||
|
from_token.room_key
|
||||||
|
).stream
|
||||||
|
|
||||||
|
possibly_changed = set(changed)
|
||||||
|
possibly_left = set()
|
||||||
|
for room_id in rooms_changed:
|
||||||
|
current_state_ids = yield self.store.get_current_state_ids(room_id)
|
||||||
|
|
||||||
|
# The user may have left the room
|
||||||
|
# TODO: Check if they actually did or if we were just invited.
|
||||||
|
if room_id not in room_ids:
|
||||||
|
for key, event_id in iteritems(current_state_ids):
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
possibly_left.add(state_key)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Fetch the current state at the time.
|
||||||
|
try:
|
||||||
|
event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||||
|
room_id, stream_ordering=stream_ordering
|
||||||
|
)
|
||||||
|
except errors.StoreError:
|
||||||
|
# we have purged the stream_ordering index since the stream
|
||||||
|
# ordering: treat it the same as a new room
|
||||||
|
event_ids = []
|
||||||
|
|
||||||
|
# special-case for an empty prev state: include all members
|
||||||
|
# in the changed list
|
||||||
|
if not event_ids:
|
||||||
|
for key, event_id in iteritems(current_state_ids):
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
possibly_changed.add(state_key)
|
||||||
|
continue
|
||||||
|
|
||||||
|
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
|
||||||
|
if not current_member_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# mapping from event_id -> state_dict
|
||||||
|
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
||||||
|
|
||||||
|
# Check if we've joined the room? If so we just blindly add all the users to
|
||||||
|
# the "possibly changed" users.
|
||||||
|
for state_dict in itervalues(prev_state_ids):
|
||||||
|
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
||||||
|
if not member_event or member_event != current_member_id:
|
||||||
|
for key, event_id in iteritems(current_state_ids):
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
possibly_changed.add(state_key)
|
||||||
|
break
|
||||||
|
|
||||||
|
# If there has been any change in membership, include them in the
|
||||||
|
# possibly changed list. We'll check if they are joined below,
|
||||||
|
# and we're not toooo worried about spuriously adding users.
|
||||||
|
for key, event_id in iteritems(current_state_ids):
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# check if this member has changed since any of the extremities
|
||||||
|
# at the stream_ordering, and add them to the list if so.
|
||||||
|
for state_dict in itervalues(prev_state_ids):
|
||||||
|
prev_event_id = state_dict.get(key, None)
|
||||||
|
if not prev_event_id or prev_event_id != event_id:
|
||||||
|
if state_key != user_id:
|
||||||
|
possibly_changed.add(state_key)
|
||||||
|
break
|
||||||
|
|
||||||
|
if possibly_changed or possibly_left:
|
||||||
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Take the intersection of the users whose devices may have changed
|
||||||
|
# and those that actually still share a room with the user
|
||||||
|
possibly_joined = possibly_changed & users_who_share_room
|
||||||
|
possibly_left = (possibly_changed | possibly_left) - users_who_share_room
|
||||||
|
else:
|
||||||
|
possibly_joined = []
|
||||||
|
possibly_left = []
|
||||||
|
|
||||||
|
defer.returnValue({
|
||||||
|
"changed": list(possibly_joined),
|
||||||
|
"left": list(possibly_left),
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceHandler(DeviceWorkerHandler):
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(DeviceHandler, self).__init__(hs)
|
||||||
|
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
|
||||||
self._edu_updater = DeviceListEduUpdater(hs, self)
|
self._edu_updater = DeviceListEduUpdater(hs, self)
|
||||||
|
@ -103,52 +275,6 @@ class DeviceHandler(BaseHandler):
|
||||||
|
|
||||||
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_devices_by_user(self, user_id):
|
|
||||||
"""
|
|
||||||
Retrieve the given user's devices
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id (str):
|
|
||||||
Returns:
|
|
||||||
defer.Deferred: list[dict[str, X]]: info on each device
|
|
||||||
"""
|
|
||||||
|
|
||||||
device_map = yield self.store.get_devices_by_user(user_id)
|
|
||||||
|
|
||||||
ips = yield self.store.get_last_client_ip_by_device(
|
|
||||||
user_id, device_id=None
|
|
||||||
)
|
|
||||||
|
|
||||||
devices = list(device_map.values())
|
|
||||||
for device in devices:
|
|
||||||
_update_device_from_client_ips(device, ips)
|
|
||||||
|
|
||||||
defer.returnValue(devices)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_device(self, user_id, device_id):
|
|
||||||
""" Retrieve the given device
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id (str):
|
|
||||||
device_id (str):
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
defer.Deferred: dict[str, X]: info on the device
|
|
||||||
Raises:
|
|
||||||
errors.NotFoundError: if the device was not found
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
device = yield self.store.get_device(user_id, device_id)
|
|
||||||
except errors.StoreError:
|
|
||||||
raise errors.NotFoundError
|
|
||||||
ips = yield self.store.get_last_client_ip_by_device(
|
|
||||||
user_id, device_id,
|
|
||||||
)
|
|
||||||
_update_device_from_client_ips(device, ips)
|
|
||||||
defer.returnValue(device)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_device(self, user_id, device_id):
|
def delete_device(self, user_id, device_id):
|
||||||
""" Delete the given device
|
""" Delete the given device
|
||||||
|
@ -287,126 +413,6 @@ class DeviceHandler(BaseHandler):
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
self.federation_sender.send_device_messages(host)
|
self.federation_sender.send_device_messages(host)
|
||||||
|
|
||||||
@measure_func("device.get_user_ids_changed")
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_user_ids_changed(self, user_id, from_token):
|
|
||||||
"""Get list of users that have had the devices updated, or have newly
|
|
||||||
joined a room, that `user_id` may be interested in.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id (str)
|
|
||||||
from_token (StreamToken)
|
|
||||||
"""
|
|
||||||
now_token = yield self.hs.get_event_sources().get_current_token()
|
|
||||||
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
|
||||||
|
|
||||||
# First we check if any devices have changed
|
|
||||||
changed = yield self.store.get_user_whose_devices_changed(
|
|
||||||
from_token.device_list_key
|
|
||||||
)
|
|
||||||
|
|
||||||
# Then work out if any users have since joined
|
|
||||||
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
|
||||||
|
|
||||||
member_events = yield self.store.get_membership_changes_for_user(
|
|
||||||
user_id, from_token.room_key, now_token.room_key
|
|
||||||
)
|
|
||||||
rooms_changed.update(event.room_id for event in member_events)
|
|
||||||
|
|
||||||
stream_ordering = RoomStreamToken.parse_stream_token(
|
|
||||||
from_token.room_key
|
|
||||||
).stream
|
|
||||||
|
|
||||||
possibly_changed = set(changed)
|
|
||||||
possibly_left = set()
|
|
||||||
for room_id in rooms_changed:
|
|
||||||
current_state_ids = yield self.store.get_current_state_ids(room_id)
|
|
||||||
|
|
||||||
# The user may have left the room
|
|
||||||
# TODO: Check if they actually did or if we were just invited.
|
|
||||||
if room_id not in room_ids:
|
|
||||||
for key, event_id in iteritems(current_state_ids):
|
|
||||||
etype, state_key = key
|
|
||||||
if etype != EventTypes.Member:
|
|
||||||
continue
|
|
||||||
possibly_left.add(state_key)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Fetch the current state at the time.
|
|
||||||
try:
|
|
||||||
event_ids = yield self.store.get_forward_extremeties_for_room(
|
|
||||||
room_id, stream_ordering=stream_ordering
|
|
||||||
)
|
|
||||||
except errors.StoreError:
|
|
||||||
# we have purged the stream_ordering index since the stream
|
|
||||||
# ordering: treat it the same as a new room
|
|
||||||
event_ids = []
|
|
||||||
|
|
||||||
# special-case for an empty prev state: include all members
|
|
||||||
# in the changed list
|
|
||||||
if not event_ids:
|
|
||||||
for key, event_id in iteritems(current_state_ids):
|
|
||||||
etype, state_key = key
|
|
||||||
if etype != EventTypes.Member:
|
|
||||||
continue
|
|
||||||
possibly_changed.add(state_key)
|
|
||||||
continue
|
|
||||||
|
|
||||||
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
|
|
||||||
if not current_member_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# mapping from event_id -> state_dict
|
|
||||||
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
|
||||||
|
|
||||||
# Check if we've joined the room? If so we just blindly add all the users to
|
|
||||||
# the "possibly changed" users.
|
|
||||||
for state_dict in itervalues(prev_state_ids):
|
|
||||||
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
|
||||||
if not member_event or member_event != current_member_id:
|
|
||||||
for key, event_id in iteritems(current_state_ids):
|
|
||||||
etype, state_key = key
|
|
||||||
if etype != EventTypes.Member:
|
|
||||||
continue
|
|
||||||
possibly_changed.add(state_key)
|
|
||||||
break
|
|
||||||
|
|
||||||
# If there has been any change in membership, include them in the
|
|
||||||
# possibly changed list. We'll check if they are joined below,
|
|
||||||
# and we're not toooo worried about spuriously adding users.
|
|
||||||
for key, event_id in iteritems(current_state_ids):
|
|
||||||
etype, state_key = key
|
|
||||||
if etype != EventTypes.Member:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# check if this member has changed since any of the extremities
|
|
||||||
# at the stream_ordering, and add them to the list if so.
|
|
||||||
for state_dict in itervalues(prev_state_ids):
|
|
||||||
prev_event_id = state_dict.get(key, None)
|
|
||||||
if not prev_event_id or prev_event_id != event_id:
|
|
||||||
if state_key != user_id:
|
|
||||||
possibly_changed.add(state_key)
|
|
||||||
break
|
|
||||||
|
|
||||||
if possibly_changed or possibly_left:
|
|
||||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
|
||||||
user_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# Take the intersection of the users whose devices may have changed
|
|
||||||
# and those that actually still share a room with the user
|
|
||||||
possibly_joined = possibly_changed & users_who_share_room
|
|
||||||
possibly_left = (possibly_changed | possibly_left) - users_who_share_room
|
|
||||||
else:
|
|
||||||
possibly_joined = []
|
|
||||||
possibly_left = []
|
|
||||||
|
|
||||||
defer.returnValue({
|
|
||||||
"changed": list(possibly_joined),
|
|
||||||
"left": list(possibly_left),
|
|
||||||
})
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_federation_query_user_devices(self, user_id):
|
def on_federation_query_user_devices(self, user_id):
|
||||||
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
||||||
|
|
|
@ -51,7 +51,7 @@ from synapse.handlers.acme import AcmeHandler
|
||||||
from synapse.handlers.appservice import ApplicationServicesHandler
|
from synapse.handlers.appservice import ApplicationServicesHandler
|
||||||
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
|
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
|
||||||
from synapse.handlers.deactivate_account import DeactivateAccountHandler
|
from synapse.handlers.deactivate_account import DeactivateAccountHandler
|
||||||
from synapse.handlers.device import DeviceHandler
|
from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
|
||||||
from synapse.handlers.devicemessage import DeviceMessageHandler
|
from synapse.handlers.devicemessage import DeviceMessageHandler
|
||||||
from synapse.handlers.e2e_keys import E2eKeysHandler
|
from synapse.handlers.e2e_keys import E2eKeysHandler
|
||||||
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
|
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
|
||||||
|
@ -307,7 +307,10 @@ class HomeServer(object):
|
||||||
return MacaroonGenerator(self)
|
return MacaroonGenerator(self)
|
||||||
|
|
||||||
def build_device_handler(self):
|
def build_device_handler(self):
|
||||||
return DeviceHandler(self)
|
if self.config.worker_app:
|
||||||
|
return DeviceWorkerHandler(self)
|
||||||
|
else:
|
||||||
|
return DeviceHandler(self)
|
||||||
|
|
||||||
def build_device_message_handler(self):
|
def build_device_message_handler(self):
|
||||||
return DeviceMessageHandler(self)
|
return DeviceMessageHandler(self)
|
||||||
|
|
Loading…
Reference in a new issue