mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Prevent multiple upgrades on the same room at once (#5051)
Closes #4583 Does slightly less than #5045, which prevented a room from being upgraded multiple times, one after another. This PR still allows that, but just prevents two from happening at the same time. Mostly just to mitigate the fact that servers are slow and it can take a moment for the room upgrade to actually complete. We don't want people sending another request to upgrade the room when really they just thought the first didn't go through.
This commit is contained in:
parent
6fa36c22fa
commit
ef8c62758c
3 changed files with 92 additions and 53 deletions
1
changelog.d/5051.bugfix
Normal file
1
changelog.d/5051.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Prevent >1 room upgrades happening simultaneously on the same room.
|
|
@ -32,6 +32,7 @@ from synapse.storage.state import StateFilter
|
|||
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
@ -40,6 +41,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
id_server_scheme = "https://"
|
||||
|
||||
FIVE_MINUTES_IN_MS = 5 * 60 * 1000
|
||||
|
||||
|
||||
class RoomCreationHandler(BaseHandler):
|
||||
|
||||
|
@ -75,6 +78,12 @@ class RoomCreationHandler(BaseHandler):
|
|||
# linearizer to stop two upgrades happening at once
|
||||
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
|
||||
|
||||
# If a user tries to update the same room multiple times in quick
|
||||
# succession, only process the first attempt and return its result to
|
||||
# subsequent requests
|
||||
self._upgrade_response_cache = ResponseCache(
|
||||
hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
|
||||
)
|
||||
self._server_notices_mxid = hs.config.server_notices_mxid
|
||||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
|
@ -95,67 +104,96 @@ class RoomCreationHandler(BaseHandler):
|
|||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
with (yield self._upgrade_linearizer.queue(old_room_id)):
|
||||
# start by allocating a new room id
|
||||
r = yield self.store.get_room(old_room_id)
|
||||
if r is None:
|
||||
raise NotFoundError("Unknown room id %s" % (old_room_id,))
|
||||
new_room_id = yield self._generate_room_id(
|
||||
creator_id=user_id, is_public=r["is_public"]
|
||||
)
|
||||
|
||||
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
|
||||
|
||||
# we create and auth the tombstone event before properly creating the new
|
||||
# room, to check our user has perms in the old room.
|
||||
tombstone_event, tombstone_context = (
|
||||
yield self.event_creation_handler.create_event(
|
||||
requester,
|
||||
{
|
||||
"type": EventTypes.Tombstone,
|
||||
"state_key": "",
|
||||
"room_id": old_room_id,
|
||||
"sender": user_id,
|
||||
"content": {
|
||||
"body": "This room has been replaced",
|
||||
"replacement_room": new_room_id,
|
||||
},
|
||||
},
|
||||
token_id=requester.access_token_id,
|
||||
# Check if this room is already being upgraded by another person
|
||||
for key in self._upgrade_response_cache.pending_result_cache:
|
||||
if key[0] == old_room_id and key[1] != user_id:
|
||||
# Two different people are trying to upgrade the same room.
|
||||
# Send the second an error.
|
||||
#
|
||||
# Note that this of course only gets caught if both users are
|
||||
# on the same homeserver.
|
||||
raise SynapseError(
|
||||
400, "An upgrade for this room is currently in progress"
|
||||
)
|
||||
)
|
||||
old_room_version = yield self.store.get_room_version(old_room_id)
|
||||
yield self.auth.check_from_context(
|
||||
old_room_version, tombstone_event, tombstone_context
|
||||
)
|
||||
|
||||
yield self.clone_existing_room(
|
||||
# Upgrade the room
|
||||
#
|
||||
# If this user has sent multiple upgrade requests for the same room
|
||||
# and one of them is not complete yet, cache the response and
|
||||
# return it to all subsequent requests
|
||||
ret = yield self._upgrade_response_cache.wrap(
|
||||
(old_room_id, user_id),
|
||||
self._upgrade_room,
|
||||
requester,
|
||||
old_room_id,
|
||||
new_version, # args for _upgrade_room
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _upgrade_room(self, requester, old_room_id, new_version):
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
# start by allocating a new room id
|
||||
r = yield self.store.get_room(old_room_id)
|
||||
if r is None:
|
||||
raise NotFoundError("Unknown room id %s" % (old_room_id,))
|
||||
new_room_id = yield self._generate_room_id(
|
||||
creator_id=user_id, is_public=r["is_public"]
|
||||
)
|
||||
|
||||
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
|
||||
|
||||
# we create and auth the tombstone event before properly creating the new
|
||||
# room, to check our user has perms in the old room.
|
||||
tombstone_event, tombstone_context = (
|
||||
yield self.event_creation_handler.create_event(
|
||||
requester,
|
||||
old_room_id=old_room_id,
|
||||
new_room_id=new_room_id,
|
||||
new_room_version=new_version,
|
||||
tombstone_event_id=tombstone_event.event_id,
|
||||
{
|
||||
"type": EventTypes.Tombstone,
|
||||
"state_key": "",
|
||||
"room_id": old_room_id,
|
||||
"sender": user_id,
|
||||
"content": {
|
||||
"body": "This room has been replaced",
|
||||
"replacement_room": new_room_id,
|
||||
},
|
||||
},
|
||||
token_id=requester.access_token_id,
|
||||
)
|
||||
)
|
||||
old_room_version = yield self.store.get_room_version(old_room_id)
|
||||
yield self.auth.check_from_context(
|
||||
old_room_version, tombstone_event, tombstone_context
|
||||
)
|
||||
|
||||
# now send the tombstone
|
||||
yield self.event_creation_handler.send_nonmember_event(
|
||||
requester, tombstone_event, tombstone_context
|
||||
)
|
||||
yield self.clone_existing_room(
|
||||
requester,
|
||||
old_room_id=old_room_id,
|
||||
new_room_id=new_room_id,
|
||||
new_room_version=new_version,
|
||||
tombstone_event_id=tombstone_event.event_id,
|
||||
)
|
||||
|
||||
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
|
||||
# now send the tombstone
|
||||
yield self.event_creation_handler.send_nonmember_event(
|
||||
requester, tombstone_event, tombstone_context
|
||||
)
|
||||
|
||||
# update any aliases
|
||||
yield self._move_aliases_to_new_room(
|
||||
requester, old_room_id, new_room_id, old_room_state
|
||||
)
|
||||
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
|
||||
|
||||
# and finally, shut down the PLs in the old room, and update them in the new
|
||||
# room.
|
||||
yield self._update_upgraded_room_pls(
|
||||
requester, old_room_id, new_room_id, old_room_state
|
||||
)
|
||||
# update any aliases
|
||||
yield self._move_aliases_to_new_room(
|
||||
requester, old_room_id, new_room_id, old_room_state
|
||||
)
|
||||
|
||||
defer.returnValue(new_room_id)
|
||||
# and finally, shut down the PLs in the old room, and update them in the new
|
||||
# room.
|
||||
yield self._update_upgraded_room_pls(
|
||||
requester, old_room_id, new_room_id, old_room_state
|
||||
)
|
||||
|
||||
defer.returnValue(new_room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_upgraded_room_pls(
|
||||
|
|
|
@ -137,7 +137,7 @@ class ResponseCache(object):
|
|||
|
||||
*args: positional parameters to pass to the callback, if it is used
|
||||
|
||||
**kwargs: named paramters to pass to the callback, if it is used
|
||||
**kwargs: named parameters to pass to the callback, if it is used
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred: yieldable result
|
||||
|
|
Loading…
Reference in a new issue