Prototype of using m.typing to clean up MSC3401 call memberships

Only apply logic where io.element.type=org.matrix.msc3401.call.member on the typing PUT
This commit is contained in:
Hugh Nimmo-Smith 2024-07-16 17:58:18 +01:00
parent 75b788f49f
commit 8dea89837a
2 changed files with 110 additions and 15 deletions

View file

@ -40,7 +40,9 @@ from synapse.types import (
StrCollection,
StreamKeyType,
UserID,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
@ -58,6 +60,8 @@ logger = logging.getLogger(__name__)
class RoomMember:
room_id: str
user_id: str
type: str
device_id: Optional[str] = None
# How often we expect remote servers to resend us presence.
@ -108,6 +112,8 @@ class FollowerTypingHandler:
self.clock.looping_call(self._handle_timeouts, 5000)
self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT)
self.state = hs.get_state_handler()
self.event_creation_handler = hs.get_event_creation_handler()
def _reset(self) -> None:
"""Reset the typing handler's data caches."""
@ -130,9 +136,9 @@ class FollowerTypingHandler:
members = set(self.wheel_timer.fetch(now))
for member in members:
self._handle_timeout_for_member(now, member)
await self._handle_timeout_for_member(now, member)
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
if not self.is_typing(member):
# Nothing to do if they're no longer typing
return
@ -283,8 +289,8 @@ class TypingWriterHandler(FollowerTypingHandler):
"TypingStreamChangeCache", self._latest_room_serial
)
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
super()._handle_timeout_for_member(now, member)
async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
await super()._handle_timeout_for_member(now, member)
if not self.is_typing(member):
# Nothing to do if they're no longer typing
@ -293,11 +299,16 @@ class TypingWriterHandler(FollowerTypingHandler):
until = self._member_typing_until.get(member, None)
if not until or until <= now:
logger.info("Timing out typing for: %s", member.user_id)
self._stopped_typing(member)
await self._stopped_typing(member)
return
async def started_typing(
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
self,
target_user: UserID,
requester: Requester,
room_id: str,
timeout: int,
type: str,
) -> None:
target_user_id = target_user.to_string()
@ -316,7 +327,12 @@ class TypingWriterHandler(FollowerTypingHandler):
logger.debug("%s has started typing in %s", target_user_id, room_id)
member = RoomMember(room_id=room_id, user_id=target_user_id)
member = RoomMember(
room_id=room_id,
user_id=target_user_id,
device_id=requester.device_id,
type=type,
)
was_present = member.user_id in self._room_typing.get(room_id, set())
@ -332,7 +348,7 @@ class TypingWriterHandler(FollowerTypingHandler):
self._push_update(member=member, typing=True)
async def stopped_typing(
self, target_user: UserID, requester: Requester, room_id: str
self, target_user: UserID, requester: Requester, room_id: str, type: str
) -> None:
target_user_id = target_user.to_string()
@ -351,17 +367,90 @@ class TypingWriterHandler(FollowerTypingHandler):
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
member = RoomMember(room_id=room_id, user_id=target_user_id)
member = RoomMember(
room_id=room_id,
user_id=target_user_id,
device_id=requester.device_id,
type=type,
)
self._stopped_typing(member)
await self._stopped_typing(member)
def user_left_room(self, user: UserID, room_id: str) -> None:
async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
if self.is_mine_id(user_id):
member = RoomMember(room_id=room_id, user_id=user_id)
self._stopped_typing(member)
# We use a device_id of "*" to indicate that all devices will have left the room
member = RoomMember(room_id=room_id, user_id=user_id, device_id="*")
await self._stopped_typing(member)
async def _stopped_typing(self, member: RoomMember) -> None:
logger.debug(
"User %s has stopped typing (type=%s) in %s on device %s",
member.user_id,
member.type,
member.room_id,
member.device_id,
)
if (
member.type == "org.matrix.msc3401.call.member"
and member.device_id is not None
):
# Check room state to see if any MSC3401 member events needing removal
event_filter = [
("org.matrix.msc3401.call.member", member.user_id),
]
state_filter = StateFilter.from_types(event_filter)
state_ids = await self._storage_controllers.state.get_current_state_ids(
member.room_id,
state_filter,
)
state_events = await self.store.get_events(state_ids.values())
for event in state_events.values():
if "memberships" in event.content:
# n.b. Simplified type
memberships: List[Dict[str, str]] = event.content["memberships"]
new_memberships = []
modified = False
if member.device_id == "*":
# user has left room so remove all memberships if the were any
modified = len(memberships) > 0
else:
# find a membership for this device_id
for membership in memberships:
if membership["device_id"] == member.device_id:
modified = True
else:
new_memberships.append(membership)
if modified:
# send state event with new memberships
logger.info(
"Found matching MSC3401 call member event to cleanup. New memberships is %s",
new_memberships,
)
requester = create_requester(
member.user_id, authenticated_entity=member.user_id
)
await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": "org.matrix.msc3401.call.member",
"state_key": member.user_id,
"room_id": member.room_id,
"sender": member.user_id,
"content": {
"memberships": new_memberships,
},
},
ratelimit=False,
)
def _stopped_typing(self, member: RoomMember) -> None:
if member.user_id not in self._room_typing.get(member.room_id, set()):
# No point
return

View file

@ -1256,6 +1256,8 @@ class RoomTypingRestServlet(RestServlet):
# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
type = content.get("io.element.type", "m.typing")
# Defer getting the typing handler since it will raise on WORKER_PATTERNS.
typing_handler = self.hs.get_typing_writer_handler()
@ -1266,10 +1268,14 @@ class RoomTypingRestServlet(RestServlet):
requester=requester,
room_id=room_id,
timeout=timeout,
type=type,
)
else:
await typing_handler.stopped_typing(
target_user=target_user, requester=requester, room_id=room_id
target_user=target_user,
requester=requester,
room_id=room_id,
type=type,
)
except ShadowBanError:
# Pretend this worked without error.