mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-28 15:08:49 +03:00
Easy refactors of the user directory (#10789)
No functional changes here. This came out as I was working to tackle #5677
This commit is contained in:
parent
c6f5fb5477
commit
318162f5de
8 changed files with 90 additions and 37 deletions
1
changelog.d/10789.misc
Normal file
1
changelog.d/10789.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Improve internal details of the user directory code.
|
|
@ -10,3 +10,40 @@ DB corruption) get stale or out of sync. If this happens, for now the
|
||||||
solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
|
solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
|
||||||
and then restart synapse. This should then start a background task to
|
and then restart synapse. This should then start a background task to
|
||||||
flush the current tables and regenerate the directory.
|
flush the current tables and regenerate the directory.
|
||||||
|
|
||||||
|
Data model
|
||||||
|
----------
|
||||||
|
|
||||||
|
There are five relevant tables that collectively form the "user directory".
|
||||||
|
Three of them track a master list of all the users we could search for.
|
||||||
|
The last two (collectively called the "search tables") track who can
|
||||||
|
see who.
|
||||||
|
|
||||||
|
From all of these tables we exclude three types of local user:
|
||||||
|
- support users
|
||||||
|
- appservice users
|
||||||
|
- deactivated users
|
||||||
|
|
||||||
|
* `user_directory`. This contains the user_id, display name and avatar we'll
|
||||||
|
return when you search the directory.
|
||||||
|
- Because there's only one directory entry per user, it's important that we only
|
||||||
|
ever put publicly visible names here. Otherwise we might leak a private
|
||||||
|
nickname or avatar used in a private room.
|
||||||
|
- Indexed on rooms. Indexed on users.
|
||||||
|
|
||||||
|
* `user_directory_search`. To be joined to `user_directory`. It contains an extra
|
||||||
|
column that enables full text search based on user ids and display names.
|
||||||
|
Different schemas for SQLite and Postgres with different code paths to match.
|
||||||
|
- Indexed on the full text search data. Indexed on users.
|
||||||
|
|
||||||
|
* `user_directory_stream_pos`. When the initial background update to populate
|
||||||
|
the directory is complete, we record a stream position here. This indicates
|
||||||
|
that synapse should now listen for room changes and incrementally update
|
||||||
|
the directory where necessary.
|
||||||
|
|
||||||
|
* `users_in_public_rooms`. Contains associations between users and the public rooms they're in.
|
||||||
|
Used to determine which users are in public rooms and should be publicly visible in the directory.
|
||||||
|
|
||||||
|
* `users_who_share_private_rooms`. Rows are triples `(L, M, room id)` where `L`
|
||||||
|
is a local user and `M` is a local or remote user. `L` and `M` should be
|
||||||
|
different, but this isn't enforced by a constraint.
|
||||||
|
|
|
@ -131,7 +131,7 @@ class DeactivateAccountHandler(BaseHandler):
|
||||||
await self.store.add_user_pending_deactivation(user_id)
|
await self.store.add_user_pending_deactivation(user_id)
|
||||||
|
|
||||||
# delete from user directory
|
# delete from user directory
|
||||||
await self.user_directory_handler.handle_user_deactivated(user_id)
|
await self.user_directory_handler.handle_local_user_deactivated(user_id)
|
||||||
|
|
||||||
# Mark the user as erased, if they asked for that
|
# Mark the user as erased, if they asked for that
|
||||||
if erase_data:
|
if erase_data:
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from enum import Enum, auto
|
||||||
from typing import TYPE_CHECKING, Optional
|
from typing import TYPE_CHECKING, Optional
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -21,6 +22,12 @@ if TYPE_CHECKING:
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MatchChange(Enum):
|
||||||
|
no_change = auto()
|
||||||
|
now_true = auto()
|
||||||
|
now_false = auto()
|
||||||
|
|
||||||
|
|
||||||
class StateDeltasHandler:
|
class StateDeltasHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
@ -31,18 +38,12 @@ class StateDeltasHandler:
|
||||||
event_id: Optional[str],
|
event_id: Optional[str],
|
||||||
key_name: str,
|
key_name: str,
|
||||||
public_value: str,
|
public_value: str,
|
||||||
) -> Optional[bool]:
|
) -> MatchChange:
|
||||||
"""Given two events check if the `key_name` field in content changed
|
"""Given two events check if the `key_name` field in content changed
|
||||||
from not matching `public_value` to doing so.
|
from not matching `public_value` to doing so.
|
||||||
|
|
||||||
For example, check if `history_visibility` (`key_name`) changed from
|
For example, check if `history_visibility` (`key_name`) changed from
|
||||||
`shared` to `world_readable` (`public_value`).
|
`shared` to `world_readable` (`public_value`).
|
||||||
|
|
||||||
Returns:
|
|
||||||
None if the field in the events either both match `public_value`
|
|
||||||
or if neither do, i.e. there has been no change.
|
|
||||||
True if it didn't match `public_value` but now does
|
|
||||||
False if it did match `public_value` but now doesn't
|
|
||||||
"""
|
"""
|
||||||
prev_event = None
|
prev_event = None
|
||||||
event = None
|
event = None
|
||||||
|
@ -54,7 +55,7 @@ class StateDeltasHandler:
|
||||||
|
|
||||||
if not event and not prev_event:
|
if not event and not prev_event:
|
||||||
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
|
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
|
||||||
return None
|
return MatchChange.no_change
|
||||||
|
|
||||||
prev_value = None
|
prev_value = None
|
||||||
value = None
|
value = None
|
||||||
|
@ -68,8 +69,8 @@ class StateDeltasHandler:
|
||||||
logger.debug("prev_value: %r -> value: %r", prev_value, value)
|
logger.debug("prev_value: %r -> value: %r", prev_value, value)
|
||||||
|
|
||||||
if value == public_value and prev_value != public_value:
|
if value == public_value and prev_value != public_value:
|
||||||
return True
|
return MatchChange.now_true
|
||||||
elif value != public_value and prev_value == public_value:
|
elif value != public_value and prev_value == public_value:
|
||||||
return False
|
return MatchChange.now_false
|
||||||
else:
|
else:
|
||||||
return None
|
return MatchChange.no_change
|
||||||
|
|
|
@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
|
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
|
||||||
from synapse.handlers.state_deltas import StateDeltasHandler
|
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.roommember import ProfileInfo
|
from synapse.storage.roommember import ProfileInfo
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
@ -30,14 +30,26 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class UserDirectoryHandler(StateDeltasHandler):
|
class UserDirectoryHandler(StateDeltasHandler):
|
||||||
"""Handles querying of and keeping updated the user_directory.
|
"""Handles queries and updates for the user_directory.
|
||||||
|
|
||||||
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
|
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
|
||||||
|
|
||||||
The user directory is filled with users who this server can see are joined to a
|
When a local user searches the user_directory, we report two kinds of users:
|
||||||
world_readable or publicly joinable room. We keep a database table up to date
|
|
||||||
by streaming changes of the current state and recalculating whether users should
|
- users this server can see are joined to a world_readable or publicly
|
||||||
be in the directory or not when necessary.
|
joinable room, and
|
||||||
|
- users belonging to a private room shared by that local user.
|
||||||
|
|
||||||
|
The two cases are tracked separately in the `users_in_public_rooms` and
|
||||||
|
`users_who_share_private_rooms` tables. Both kinds of users have their
|
||||||
|
username and avatar tracked in a `user_directory` table.
|
||||||
|
|
||||||
|
This handler has three responsibilities:
|
||||||
|
1. Forwarding requests to `/user_directory/search` to the UserDirectoryStore.
|
||||||
|
2. Providing hooks for the application to call when local users are added,
|
||||||
|
removed, or have their profile changed.
|
||||||
|
3. Listening for room state changes that indicate remote users have
|
||||||
|
joined or left a room, or that their profile has changed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
@ -130,7 +142,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
user_id, profile.display_name, profile.avatar_url
|
user_id, profile.display_name, profile.avatar_url
|
||||||
)
|
)
|
||||||
|
|
||||||
async def handle_user_deactivated(self, user_id: str) -> None:
|
async def handle_local_user_deactivated(self, user_id: str) -> None:
|
||||||
"""Called when a user ID is deactivated"""
|
"""Called when a user ID is deactivated"""
|
||||||
# FIXME(#3714): We should probably do this in the same worker as all
|
# FIXME(#3714): We should probably do this in the same worker as all
|
||||||
# the other changes.
|
# the other changes.
|
||||||
|
@ -196,7 +208,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
public_value=Membership.JOIN,
|
public_value=Membership.JOIN,
|
||||||
)
|
)
|
||||||
|
|
||||||
if change is False:
|
if change is MatchChange.now_false:
|
||||||
# Need to check if the server left the room entirely, if so
|
# Need to check if the server left the room entirely, if so
|
||||||
# we might need to remove all the users in that room
|
# we might need to remove all the users in that room
|
||||||
is_in_room = await self.store.is_host_joined(
|
is_in_room = await self.store.is_host_joined(
|
||||||
|
@ -219,14 +231,14 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
|
|
||||||
is_support = await self.store.is_support_user(state_key)
|
is_support = await self.store.is_support_user(state_key)
|
||||||
if not is_support:
|
if not is_support:
|
||||||
if change is None:
|
if change is MatchChange.no_change:
|
||||||
# Handle any profile changes
|
# Handle any profile changes
|
||||||
await self._handle_profile_change(
|
await self._handle_profile_change(
|
||||||
state_key, room_id, prev_event_id, event_id
|
state_key, room_id, prev_event_id, event_id
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if change: # The user joined
|
if change is MatchChange.now_true: # The user joined
|
||||||
event = await self.store.get_event(event_id, allow_none=True)
|
event = await self.store.get_event(event_id, allow_none=True)
|
||||||
# It isn't expected for this event to not exist, but we
|
# It isn't expected for this event to not exist, but we
|
||||||
# don't want the entire background process to break.
|
# don't want the entire background process to break.
|
||||||
|
@ -263,14 +275,14 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
logger.debug("Handling change for %s: %s", typ, room_id)
|
logger.debug("Handling change for %s: %s", typ, room_id)
|
||||||
|
|
||||||
if typ == EventTypes.RoomHistoryVisibility:
|
if typ == EventTypes.RoomHistoryVisibility:
|
||||||
change = await self._get_key_change(
|
publicness = await self._get_key_change(
|
||||||
prev_event_id,
|
prev_event_id,
|
||||||
event_id,
|
event_id,
|
||||||
key_name="history_visibility",
|
key_name="history_visibility",
|
||||||
public_value=HistoryVisibility.WORLD_READABLE,
|
public_value=HistoryVisibility.WORLD_READABLE,
|
||||||
)
|
)
|
||||||
elif typ == EventTypes.JoinRules:
|
elif typ == EventTypes.JoinRules:
|
||||||
change = await self._get_key_change(
|
publicness = await self._get_key_change(
|
||||||
prev_event_id,
|
prev_event_id,
|
||||||
event_id,
|
event_id,
|
||||||
key_name="join_rule",
|
key_name="join_rule",
|
||||||
|
@ -278,9 +290,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise Exception("Invalid event type")
|
raise Exception("Invalid event type")
|
||||||
# If change is None, no change. True => become world_readable/public,
|
if publicness is MatchChange.no_change:
|
||||||
# False => was world_readable/public
|
|
||||||
if change is None:
|
|
||||||
logger.debug("No change")
|
logger.debug("No change")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -290,13 +300,13 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Change: %r, is_public: %r", change, is_public)
|
logger.debug("Change: %r, publicness: %r", publicness, is_public)
|
||||||
|
|
||||||
if change and not is_public:
|
if publicness is MatchChange.now_true and not is_public:
|
||||||
# If we became world readable but room isn't currently public then
|
# If we became world readable but room isn't currently public then
|
||||||
# we ignore the change
|
# we ignore the change
|
||||||
return
|
return
|
||||||
elif not change and is_public:
|
elif publicness is MatchChange.now_false and is_public:
|
||||||
# If we stopped being world readable but are still public,
|
# If we stopped being world readable but are still public,
|
||||||
# ignore the change
|
# ignore the change
|
||||||
return
|
return
|
||||||
|
|
|
@ -196,6 +196,11 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
) -> Dict[str, ProfileInfo]:
|
) -> Dict[str, ProfileInfo]:
|
||||||
"""Get a mapping from user ID to profile information for all users in a given room.
|
"""Get a mapping from user ID to profile information for all users in a given room.
|
||||||
|
|
||||||
|
The profile information comes directly from this room's `m.room.member`
|
||||||
|
events, and so may be specific to this room rather than part of a user's
|
||||||
|
global profile. To avoid privacy leaks, the profile data should only be
|
||||||
|
revealed to users who are already in this room.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id: The ID of the room to retrieve the users of.
|
room_id: The ID of the room to retrieve the users of.
|
||||||
|
|
||||||
|
|
|
@ -196,7 +196,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
|
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
|
||||||
user_ids = set(users_with_profile)
|
|
||||||
|
|
||||||
# Update each user in the user directory.
|
# Update each user in the user directory.
|
||||||
for user_id, profile in users_with_profile.items():
|
for user_id, profile in users_with_profile.items():
|
||||||
|
@ -207,7 +206,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
to_insert = set()
|
to_insert = set()
|
||||||
|
|
||||||
if is_public:
|
if is_public:
|
||||||
for user_id in user_ids:
|
for user_id in users_with_profile:
|
||||||
if self.get_if_app_services_interested_in_user(user_id):
|
if self.get_if_app_services_interested_in_user(user_id):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -217,14 +216,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
await self.add_users_in_public_rooms(room_id, to_insert)
|
await self.add_users_in_public_rooms(room_id, to_insert)
|
||||||
to_insert.clear()
|
to_insert.clear()
|
||||||
else:
|
else:
|
||||||
for user_id in user_ids:
|
for user_id in users_with_profile:
|
||||||
if not self.hs.is_mine_id(user_id):
|
if not self.hs.is_mine_id(user_id):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if self.get_if_app_services_interested_in_user(user_id):
|
if self.get_if_app_services_interested_in_user(user_id):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for other_user_id in user_ids:
|
for other_user_id in users_with_profile:
|
||||||
if user_id == other_user_id:
|
if user_id == other_user_id:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# deactivate user
|
# deactivate user
|
||||||
self.get_success(self.store.set_user_deactivated_status(r_user_id, True))
|
self.get_success(self.store.set_user_deactivated_status(r_user_id, True))
|
||||||
self.get_success(self.handler.handle_user_deactivated(r_user_id))
|
self.get_success(self.handler.handle_local_user_deactivated(r_user_id))
|
||||||
|
|
||||||
# profile is not in directory
|
# profile is not in directory
|
||||||
profile = self.get_success(self.store.get_user_in_directory(r_user_id))
|
profile = self.get_success(self.store.get_user_in_directory(r_user_id))
|
||||||
|
@ -118,7 +118,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
|
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
|
||||||
self.get_success(self.handler.handle_user_deactivated(s_user_id))
|
self.get_success(self.handler.handle_local_user_deactivated(s_user_id))
|
||||||
self.store.remove_from_user_dir.not_called()
|
self.store.remove_from_user_dir.not_called()
|
||||||
|
|
||||||
def test_handle_user_deactivated_regular_user(self):
|
def test_handle_user_deactivated_regular_user(self):
|
||||||
|
@ -127,7 +127,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
self.store.register_user(user_id=r_user_id, password_hash=None)
|
self.store.register_user(user_id=r_user_id, password_hash=None)
|
||||||
)
|
)
|
||||||
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
|
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
|
||||||
self.get_success(self.handler.handle_user_deactivated(r_user_id))
|
self.get_success(self.handler.handle_local_user_deactivated(r_user_id))
|
||||||
self.store.remove_from_user_dir.called_once_with(r_user_id)
|
self.store.remove_from_user_dir.called_once_with(r_user_id)
|
||||||
|
|
||||||
def test_private_room(self):
|
def test_private_room(self):
|
||||||
|
|
Loading…
Reference in a new issue