diff --git a/changelog.d/6866.feature b/changelog.d/6866.feature new file mode 100644 index 0000000000..256feab6ff --- /dev/null +++ b/changelog.d/6866.feature @@ -0,0 +1 @@ +Add ability to run some group APIs on workers. diff --git a/docs/workers.md b/docs/workers.md index 09a9d8a7b8..82442d6a0a 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -177,8 +177,13 @@ endpoints matching the following regular expressions: ^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/exchange_third_party_invite/ ^/_matrix/federation/v1/send/ + ^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/key/v2/query +Additionally, the following REST endpoints can be handled for GET requests: + + ^/_matrix/federation/v1/groups/ + The above endpoints should all be routed to the federation_reader worker by the reverse-proxy configuration. @@ -254,10 +259,13 @@ following regular expressions: ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ ^/_matrix/client/versions$ ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ + ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$ Additionally, the following REST endpoints can be handled for GET requests: ^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$ + ^/_matrix/client/(api/v1|r0|unstable)/groups/.*$ Additionally, the following REST endpoints can be handled, but all requests must be routed to the same instance: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index ca96da6a4a..7fa91a3b11 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import ( RoomStateRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet +from synapse.rest.client.v2_alpha import groups from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet @@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer): PushRuleRestServlet(self).register(resource) VersionsRestServlet(self).register(resource) + groups.register_servlets(self, resource) + resources.update({"/_matrix/client": resource}) root_resource = create_resource_tree(resources, NoResource()) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 1f1cea1416..5e17ef1396 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -35,6 +35,7 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore @@ -66,6 +67,7 @@ class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, SlavedRegistrationStore, + SlavedGroupServerStore, RoomStore, DirectoryStore, SlavedTransactionStore, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 0ec9be3cb5..c106abae21 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) # TODO: Flairs -class GroupsServerHandler(object): +class GroupsServerWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -51,9 +51,6 @@ class GroupsServerHandler(object): self.transport_client = hs.get_federation_transport_client() self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - @defer.inlineCallbacks def check_group_is_ours( self, group_id, requester_user_id, and_exists=False, and_is_admin=None @@ -167,68 +164,6 @@ class GroupsServerHandler(object): "user": membership_info, } - @defer.inlineCallbacks - def update_group_summary_room( - self, group_id, requester_user_id, room_id, category_id, content - ): - """Add/update a room to the group summary - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - RoomID.from_string(room_id) # Ensure valid room id - - order = content.get("order", None) - - is_public = _parse_visibility_from_contents(content) - - yield self.store.add_room_to_summary( - group_id=group_id, - room_id=room_id, - category_id=category_id, - order=order, - is_public=is_public, - ) - - return {} - - @defer.inlineCallbacks - def delete_group_summary_room( - self, group_id, requester_user_id, room_id, category_id - ): - """Remove a room from the summary - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - yield self.store.remove_room_from_summary( - group_id=group_id, room_id=room_id, category_id=category_id - ) - - return {} - - @defer.inlineCallbacks - def set_group_join_policy(self, group_id, requester_user_id, content): - """Sets the group join policy. - - Currently supported policies are: - - "invite": an invite must be received and accepted in order to join. - - "open": anyone can join. - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - join_policy = _parse_join_policy_from_contents(content) - if join_policy is None: - raise SynapseError(400, "No value specified for 'm.join_policy'") - - yield self.store.set_group_join_policy(group_id, join_policy=join_policy) - - return {} - @defer.inlineCallbacks def get_group_categories(self, group_id, requester_user_id): """Get all categories in a group (as seen by user) @@ -248,42 +183,10 @@ class GroupsServerHandler(object): group_id=group_id, category_id=category_id ) + logger.info("group %s", res) + return res - @defer.inlineCallbacks - def update_group_category(self, group_id, requester_user_id, category_id, content): - """Add/Update a group category - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - is_public = _parse_visibility_from_contents(content) - profile = content.get("profile") - - yield self.store.upsert_group_category( - group_id=group_id, - category_id=category_id, - is_public=is_public, - profile=profile, - ) - - return {} - - @defer.inlineCallbacks - def delete_group_category(self, group_id, requester_user_id, category_id): - """Delete a group category - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - yield self.store.remove_group_category( - group_id=group_id, category_id=category_id - ) - - return {} - @defer.inlineCallbacks def get_group_roles(self, group_id, requester_user_id): """Get all roles in a group (as seen by user) @@ -302,74 +205,6 @@ class GroupsServerHandler(object): res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) return res - @defer.inlineCallbacks - def update_group_role(self, group_id, requester_user_id, role_id, content): - """Add/update a role in a group - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - is_public = _parse_visibility_from_contents(content) - - profile = content.get("profile") - - yield self.store.upsert_group_role( - group_id=group_id, role_id=role_id, is_public=is_public, profile=profile - ) - - return {} - - @defer.inlineCallbacks - def delete_group_role(self, group_id, requester_user_id, role_id): - """Remove role from group - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - yield self.store.remove_group_role(group_id=group_id, role_id=role_id) - - return {} - - @defer.inlineCallbacks - def update_group_summary_user( - self, group_id, requester_user_id, user_id, role_id, content - ): - """Add/update a users entry in the group summary - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - order = content.get("order", None) - - is_public = _parse_visibility_from_contents(content) - - yield self.store.add_user_to_summary( - group_id=group_id, - user_id=user_id, - role_id=role_id, - order=order, - is_public=is_public, - ) - - return {} - - @defer.inlineCallbacks - def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): - """Remove a user from the group summary - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - yield self.store.remove_user_from_summary( - group_id=group_id, user_id=user_id, role_id=role_id - ) - - return {} - @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): """Get the group profile as seen by requester_user_id @@ -394,24 +229,6 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") - @defer.inlineCallbacks - def update_group_profile(self, group_id, requester_user_id, content): - """Update the group profile - """ - yield self.check_group_is_ours( - group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id - ) - - profile = {} - for keyname in ("name", "avatar_url", "short_description", "long_description"): - if keyname in content: - value = content[keyname] - if not isinstance(value, string_types): - raise SynapseError(400, "%r value is not a string" % (keyname,)) - profile[keyname] = value - - yield self.store.update_group_profile(group_id, profile) - @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): """Get the users in group as seen by requester_user_id. @@ -530,6 +347,196 @@ class GroupsServerHandler(object): return {"chunk": chunk, "total_room_count_estimate": len(room_results)} + +class GroupsServerHandler(GroupsServerWorkerHandler): + def __init__(self, hs): + super(GroupsServerHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + @defer.inlineCallbacks + def update_group_summary_room( + self, group_id, requester_user_id, room_id, category_id, content + ): + """Add/update a room to the group summary + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + RoomID.from_string(room_id) # Ensure valid room id + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_room_to_summary( + group_id=group_id, + room_id=room_id, + category_id=category_id, + order=order, + is_public=is_public, + ) + + return {} + + @defer.inlineCallbacks + def delete_group_summary_room( + self, group_id, requester_user_id, room_id, category_id + ): + """Remove a room from the summary + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_room_from_summary( + group_id=group_id, room_id=room_id, category_id=category_id + ) + + return {} + + @defer.inlineCallbacks + def set_group_join_policy(self, group_id, requester_user_id, content): + """Sets the group join policy. + + Currently supported policies are: + - "invite": an invite must be received and accepted in order to join. + - "open": anyone can join. + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + join_policy = _parse_join_policy_from_contents(content) + if join_policy is None: + raise SynapseError(400, "No value specified for 'm.join_policy'") + + yield self.store.set_group_join_policy(group_id, join_policy=join_policy) + + return {} + + @defer.inlineCallbacks + def update_group_category(self, group_id, requester_user_id, category_id, content): + """Add/Update a group category + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + is_public = _parse_visibility_from_contents(content) + profile = content.get("profile") + + yield self.store.upsert_group_category( + group_id=group_id, + category_id=category_id, + is_public=is_public, + profile=profile, + ) + + return {} + + @defer.inlineCallbacks + def delete_group_category(self, group_id, requester_user_id, category_id): + """Delete a group category + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_group_category( + group_id=group_id, category_id=category_id + ) + + return {} + + @defer.inlineCallbacks + def update_group_role(self, group_id, requester_user_id, role_id, content): + """Add/update a role in a group + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + is_public = _parse_visibility_from_contents(content) + + profile = content.get("profile") + + yield self.store.upsert_group_role( + group_id=group_id, role_id=role_id, is_public=is_public, profile=profile + ) + + return {} + + @defer.inlineCallbacks + def delete_group_role(self, group_id, requester_user_id, role_id): + """Remove role from group + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_group_role(group_id=group_id, role_id=role_id) + + return {} + + @defer.inlineCallbacks + def update_group_summary_user( + self, group_id, requester_user_id, user_id, role_id, content + ): + """Add/update a users entry in the group summary + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_user_to_summary( + group_id=group_id, + user_id=user_id, + role_id=role_id, + order=order, + is_public=is_public, + ) + + return {} + + @defer.inlineCallbacks + def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + """Remove a user from the group summary + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_user_from_summary( + group_id=group_id, user_id=user_id, role_id=role_id + ) + + return {} + + @defer.inlineCallbacks + def update_group_profile(self, group_id, requester_user_id, content): + """Update the group profile + """ + yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id + ) + + profile = {} + for keyname in ("name", "avatar_url", "short_description", "long_description"): + if keyname in content: + value = content[keyname] + if not isinstance(value, string_types): + raise SynapseError(400, "%r value is not a string" % (keyname,)) + profile[keyname] = value + + yield self.store.update_group_profile(group_id, profile) + @defer.inlineCallbacks def add_room_to_group(self, group_id, requester_user_id, room_id, content): """Add room to group diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 319565510f..ad22415782 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -63,7 +63,7 @@ def _create_rerouter(func_name): return f -class GroupsLocalHandler(object): +class GroupsLocalWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -81,40 +81,17 @@ class GroupsLocalHandler(object): self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - # The following functions merely route the query to the local groups server # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") - update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") - get_invited_users_in_group = _create_rerouter("get_invited_users_in_group") - - add_room_to_group = _create_rerouter("add_room_to_group") - update_room_in_group = _create_rerouter("update_room_in_group") - remove_room_from_group = _create_rerouter("remove_room_from_group") - - update_group_summary_room = _create_rerouter("update_group_summary_room") - delete_group_summary_room = _create_rerouter("delete_group_summary_room") - - update_group_category = _create_rerouter("update_group_category") - delete_group_category = _create_rerouter("delete_group_category") get_group_category = _create_rerouter("get_group_category") get_group_categories = _create_rerouter("get_group_categories") - - update_group_summary_user = _create_rerouter("update_group_summary_user") - delete_group_summary_user = _create_rerouter("delete_group_summary_user") - - update_group_role = _create_rerouter("update_group_role") - delete_group_role = _create_rerouter("delete_group_role") get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") - set_group_join_policy = _create_rerouter("set_group_join_policy") - @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): """Get the group summary for a group. @@ -169,6 +146,144 @@ class GroupsLocalHandler(object): return res + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + """Get users in a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + return res + + group_server_name = get_domain_from_id(group_id) + + try: + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation", {}) + try: + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + return res + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + return {"groups": group_ids} + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + result.extend(app_service.get_groups_for_user(user_id)) + + return {"groups": result} + else: + try: + bulk_result = yield self.transport_client.bulk_get_publicised_groups( + get_domain_from_id(user_id), [user_id] + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + result = bulk_result.get("users", {}).get(user_id) + # TODO: Verify attestations + return {"groups": result} + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + local_users = set() + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + local_users.add(user_id) + else: + destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + failed_results = [] + for destination, dest_user_ids in iteritems(destinations): + try: + r = yield self.transport_client.bulk_get_publicised_groups( + destination, list(dest_user_ids) + ) + results.update(r["users"]) + except Exception: + failed_results.extend(dest_user_ids) + + for uid in local_users: + results[uid] = yield self.store.get_publicised_groups_for_user(uid) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + results[uid].extend(app_service.get_groups_for_user(uid)) + + return {"users": results} + + +class GroupsLocalHandler(GroupsLocalWorkerHandler): + def __init__(self, hs): + super(GroupsLocalHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + # The following functions merely route the query to the local groups server + # or federation depending on if the group is local or remote + + update_group_profile = _create_rerouter("update_group_profile") + + add_room_to_group = _create_rerouter("add_room_to_group") + update_room_in_group = _create_rerouter("update_room_in_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + + set_group_join_policy = _create_rerouter("set_group_join_policy") + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group @@ -219,48 +334,6 @@ class GroupsLocalHandler(object): return res - @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): - """Get users in a group - """ - if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_users_in_group( - group_id, requester_user_id - ) - return res - - group_server_name = get_domain_from_id(group_id) - - try: - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - chunk = res["chunk"] - valid_entries = [] - for entry in chunk: - g_user_id = entry["user_id"] - attestation = entry.pop("attestation", {}) - try: - if get_domain_from_id(g_user_id) != group_server_name: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - server_name=get_domain_from_id(g_user_id), - ) - valid_entries.append(entry) - except Exception as e: - logger.info("Failed to verify user is in group: %s", e) - - res["chunk"] = valid_entries - - return res - @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group @@ -452,68 +525,3 @@ class GroupsLocalHandler(object): group_id, user_id, membership="leave" ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - - @defer.inlineCallbacks - def get_joined_groups(self, user_id): - group_ids = yield self.store.get_joined_groups(user_id) - return {"groups": group_ids} - - @defer.inlineCallbacks - def get_publicised_groups_for_user(self, user_id): - if self.hs.is_mine_id(user_id): - result = yield self.store.get_publicised_groups_for_user(user_id) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - result.extend(app_service.get_groups_for_user(user_id)) - - return {"groups": result} - else: - try: - bulk_result = yield self.transport_client.bulk_get_publicised_groups( - get_domain_from_id(user_id), [user_id] - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - result = bulk_result.get("users", {}).get(user_id) - # TODO: Verify attestations - return {"groups": result} - - @defer.inlineCallbacks - def bulk_get_publicised_groups(self, user_ids, proxy=True): - destinations = {} - local_users = set() - - for user_id in user_ids: - if self.hs.is_mine_id(user_id): - local_users.add(user_id) - else: - destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) - - if not proxy and destinations: - raise SynapseError(400, "Some user_ids are not local") - - results = {} - failed_results = [] - for destination, dest_user_ids in iteritems(destinations): - try: - r = yield self.transport_client.bulk_get_publicised_groups( - destination, list(dest_user_ids) - ) - results.update(r["users"]) - except Exception: - failed_results.extend(dest_user_ids) - - for uid in local_users: - results[uid] = yield self.store.get_publicised_groups_for_user(uid) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - results[uid].extend(app_service.get_groups_for_user(uid)) - - return {"users": results} diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 69a4ae42f9..2d4fd08cf5 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore from synapse.storage.database import Database from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedGroupServerStore(BaseSlavedStore): +class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): super(SlavedGroupServerStore, self).__init__(database, db_conn, hs) @@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore): self._group_updates_id_gen.get_current_token(), ) - get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user) - get_group_stream_token = __func__(DataStore.get_group_stream_token) - get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user) + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() def stream_positions(self): result = super(SlavedGroupServerStore, self).stream_positions() diff --git a/synapse/server.py b/synapse/server.py index 7926867b77..fd2f69e928 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.sender import FederationSender from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer -from synapse.groups.groups_server import GroupsServerHandler +from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler from synapse.handlers import Handlers from synapse.handlers.account_validity import AccountValidityHandler from synapse.handlers.acme import AcmeHandler @@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler -from synapse.handlers.groups_local import GroupsLocalHandler +from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler @@ -460,10 +460,16 @@ class HomeServer(object): return UserDirectoryHandler(self) def build_groups_local_handler(self): - return GroupsLocalHandler(self) + if self.config.worker_app: + return GroupsLocalWorkerHandler(self) + else: + return GroupsLocalHandler(self) def build_groups_server_handler(self): - return GroupsServerHandler(self) + if self.config.worker_app: + return GroupsServerWorkerHandler(self) + else: + return GroupsServerHandler(self) def build_groups_attestation_signing(self): return GroupAttestationSigning(self) diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index 6acd45e9f3..0963e6c250 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = "" _DEFAULT_ROLE_ID = "" -class GroupServerStore(SQLBaseStore): - def set_group_join_policy(self, group_id, join_policy): - """Set the join policy of a group. - - join_policy can be one of: - * "invite" - * "open" - """ - return self.db.simple_update_one( - table="groups", - keyvalues={"group_id": group_id}, - updatevalues={"join_policy": join_policy}, - desc="set_group_join_policy", - ) - +class GroupServerWorkerStore(SQLBaseStore): def get_group(self, group_id): return self.db.simple_select_one( table="groups", @@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore): "get_rooms_for_summary", _get_rooms_for_summary_txn ) + @defer.inlineCallbacks + def get_group_categories(self, group_id): + rows = yield self.db.simple_select_list( + table="group_room_categories", + keyvalues={"group_id": group_id}, + retcols=("category_id", "is_public", "profile"), + desc="get_group_categories", + ) + + return { + row["category_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + } + + @defer.inlineCallbacks + def get_group_category(self, group_id, category_id): + category = yield self.db.simple_select_one( + table="group_room_categories", + keyvalues={"group_id": group_id, "category_id": category_id}, + retcols=("is_public", "profile"), + desc="get_group_category", + ) + + category["profile"] = json.loads(category["profile"]) + + return category + + @defer.inlineCallbacks + def get_group_roles(self, group_id): + rows = yield self.db.simple_select_list( + table="group_roles", + keyvalues={"group_id": group_id}, + retcols=("role_id", "is_public", "profile"), + desc="get_group_roles", + ) + + return { + row["role_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + } + + @defer.inlineCallbacks + def get_group_role(self, group_id, role_id): + role = yield self.db.simple_select_one( + table="group_roles", + keyvalues={"group_id": group_id, "role_id": role_id}, + retcols=("is_public", "profile"), + desc="get_group_role", + ) + + role["profile"] = json.loads(role["profile"]) + + return role + + def get_local_groups_for_room(self, room_id): + """Get all of the local group that contain a given room + Args: + room_id (str): The ID of a room + Returns: + Deferred[list[str]]: A twisted.Deferred containing a list of group ids + containing this room + """ + return self.db.simple_select_onecol( + table="group_rooms", + keyvalues={"room_id": room_id}, + retcol="group_id", + desc="get_local_groups_for_room", + ) + + def get_users_for_summary_by_role(self, group_id, include_private=False): + """Get the users and roles that should be included in a summary request + + Returns ([users], [roles]) + """ + + def _get_users_for_summary_txn(txn): + keyvalues = {"group_id": group_id} + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT user_id, is_public, role_id, user_order + FROM group_summary_users + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + users = [ + { + "user_id": row[0], + "is_public": row[1], + "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT role_id, is_public, profile, role_order + FROM group_summary_roles + INNER JOIN group_roles USING (group_id, role_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + roles = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return users, roles + + return self.db.runInteraction( + "get_users_for_summary_by_role", _get_users_for_summary_txn + ) + + def is_user_in_group(self, user_id, group_id): + return self.db.simple_select_one_onecol( + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="is_user_in_group", + ).addCallback(lambda r: bool(r)) + + def is_user_admin_in_group(self, group_id, user_id): + return self.db.simple_select_one_onecol( + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="is_admin", + allow_none=True, + desc="is_user_admin_in_group", + ) + + def is_user_invited_to_local_group(self, group_id, user_id): + """Has the group server invited a user? + """ + return self.db.simple_select_one_onecol( + table="group_invites", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + def get_users_membership_info_in_group(self, group_id, user_id): + """Get a dict describing the membership of a user in a group. + + Example if joined: + + { + "membership": "join", + "is_public": True, + "is_privileged": False, + } + + Returns an empty dict if the user is not join/invite/etc + """ + + def _get_users_membership_in_group_txn(txn): + row = self.db.simple_select_one_txn( + txn, + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcols=("is_admin", "is_public"), + allow_none=True, + ) + + if row: + return { + "membership": "join", + "is_public": row["is_public"], + "is_privileged": row["is_admin"], + } + + row = self.db.simple_select_one_onecol_txn( + txn, + table="group_invites", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + allow_none=True, + ) + + if row: + return {"membership": "invite"} + + return {} + + return self.db.runInteraction( + "get_users_membership_info_in_group", _get_users_membership_in_group_txn + ) + + def get_publicised_groups_for_user(self, user_id): + """Get all groups a user is publicising + """ + return self.db.simple_select_onecol( + table="local_group_membership", + keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True}, + retcol="group_id", + desc="get_publicised_groups_for_user", + ) + + def get_attestations_need_renewals(self, valid_until_ms): + """Get all attestations that need to be renewed until givent time + """ + + def _get_attestations_need_renewals_txn(txn): + sql = """ + SELECT group_id, user_id FROM group_attestations_renewals + WHERE valid_until_ms <= ? + """ + txn.execute(sql, (valid_until_ms,)) + return self.db.cursor_to_dict(txn) + + return self.db.runInteraction( + "get_attestations_need_renewals", _get_attestations_need_renewals_txn + ) + + @defer.inlineCallbacks + def get_remote_attestation(self, group_id, user_id): + """Get the attestation that proves the remote agrees that the user is + in the group. + """ + row = yield self.db.simple_select_one( + table="group_attestations_remote", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcols=("valid_until_ms", "attestation_json"), + desc="get_remote_attestation", + allow_none=True, + ) + + now = int(self._clock.time_msec()) + if row and now < row["valid_until_ms"]: + return json.loads(row["attestation_json"]) + + return None + + def get_joined_groups(self, user_id): + return self.db.simple_select_onecol( + table="local_group_membership", + keyvalues={"user_id": user_id, "membership": "join"}, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token)) + return [ + { + "group_id": row[0], + "type": row[1], + "membership": row[2], + "content": json.loads(row[3]), + } + for row in txn + ] + + return self.db.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token + ) + if not has_changed: + return defer.succeed([]) + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token)) + return [ + { + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } + for group_id, membership, gtype, content_json in txn + ] + + return self.db.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn + ) + + def get_all_groups_changes(self, from_token, to_token, limit): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_any_entity_changed( + from_token + ) + if not has_changed: + return defer.succeed([]) + + def _get_all_groups_changes_txn(txn): + sql = """ + SELECT stream_id, group_id, user_id, type, content + FROM local_group_updates + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return [ + (stream_id, group_id, user_id, gtype, json.loads(content_json)) + for stream_id, group_id, user_id, gtype, content_json in txn + ] + + return self.db.runInteraction( + "get_all_groups_changes", _get_all_groups_changes_txn + ) + + +class GroupServerStore(GroupServerWorkerStore): + def set_group_join_policy(self, group_id, join_policy): + """Set the join policy of a group. + + join_policy can be one of: + * "invite" + * "open" + """ + return self.db.simple_update_one( + table="groups", + keyvalues={"group_id": group_id}, + updatevalues={"join_policy": join_policy}, + desc="set_group_join_policy", + ) + def add_room_to_summary(self, group_id, room_id, category_id, order, is_public): return self.db.runInteraction( "add_room_to_summary", @@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_room_from_summary", ) - @defer.inlineCallbacks - def get_group_categories(self, group_id): - rows = yield self.db.simple_select_list( - table="group_room_categories", - keyvalues={"group_id": group_id}, - retcols=("category_id", "is_public", "profile"), - desc="get_group_categories", - ) - - return { - row["category_id"]: { - "is_public": row["is_public"], - "profile": json.loads(row["profile"]), - } - for row in rows - } - - @defer.inlineCallbacks - def get_group_category(self, group_id, category_id): - category = yield self.db.simple_select_one( - table="group_room_categories", - keyvalues={"group_id": group_id, "category_id": category_id}, - retcols=("is_public", "profile"), - desc="get_group_category", - ) - - category["profile"] = json.loads(category["profile"]) - - return category - def upsert_group_category(self, group_id, category_id, profile, is_public): """Add/update room category for group """ @@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_group_category", ) - @defer.inlineCallbacks - def get_group_roles(self, group_id): - rows = yield self.db.simple_select_list( - table="group_roles", - keyvalues={"group_id": group_id}, - retcols=("role_id", "is_public", "profile"), - desc="get_group_roles", - ) - - return { - row["role_id"]: { - "is_public": row["is_public"], - "profile": json.loads(row["profile"]), - } - for row in rows - } - - @defer.inlineCallbacks - def get_group_role(self, group_id, role_id): - role = yield self.db.simple_select_one( - table="group_roles", - keyvalues={"group_id": group_id, "role_id": role_id}, - retcols=("is_public", "profile"), - desc="get_group_role", - ) - - role["profile"] = json.loads(role["profile"]) - - return role - def upsert_group_role(self, group_id, role_id, profile, is_public): """Add/remove user role """ @@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_user_from_summary", ) - def get_local_groups_for_room(self, room_id): - """Get all of the local group that contain a given room - Args: - room_id (str): The ID of a room - Returns: - Deferred[list[str]]: A twisted.Deferred containing a list of group ids - containing this room - """ - return self.db.simple_select_onecol( - table="group_rooms", - keyvalues={"room_id": room_id}, - retcol="group_id", - desc="get_local_groups_for_room", - ) - - def get_users_for_summary_by_role(self, group_id, include_private=False): - """Get the users and roles that should be included in a summary request - - Returns ([users], [roles]) - """ - - def _get_users_for_summary_txn(txn): - keyvalues = {"group_id": group_id} - if not include_private: - keyvalues["is_public"] = True - - sql = """ - SELECT user_id, is_public, role_id, user_order - FROM group_summary_users - WHERE group_id = ? - """ - - if not include_private: - sql += " AND is_public = ?" - txn.execute(sql, (group_id, True)) - else: - txn.execute(sql, (group_id,)) - - users = [ - { - "user_id": row[0], - "is_public": row[1], - "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, - "order": row[3], - } - for row in txn - ] - - sql = """ - SELECT role_id, is_public, profile, role_order - FROM group_summary_roles - INNER JOIN group_roles USING (group_id, role_id) - WHERE group_id = ? - """ - - if not include_private: - sql += " AND is_public = ?" - txn.execute(sql, (group_id, True)) - else: - txn.execute(sql, (group_id,)) - - roles = { - row[0]: { - "is_public": row[1], - "profile": json.loads(row[2]), - "order": row[3], - } - for row in txn - } - - return users, roles - - return self.db.runInteraction( - "get_users_for_summary_by_role", _get_users_for_summary_txn - ) - - def is_user_in_group(self, user_id, group_id): - return self.db.simple_select_one_onecol( - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - allow_none=True, - desc="is_user_in_group", - ).addCallback(lambda r: bool(r)) - - def is_user_admin_in_group(self, group_id, user_id): - return self.db.simple_select_one_onecol( - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="is_admin", - allow_none=True, - desc="is_user_admin_in_group", - ) - def add_group_invite(self, group_id, user_id): """Record that the group server has invited a user """ @@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore): desc="add_group_invite", ) - def is_user_invited_to_local_group(self, group_id, user_id): - """Has the group server invited a user? - """ - return self.db.simple_select_one_onecol( - table="group_invites", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - desc="is_user_invited_to_local_group", - allow_none=True, - ) - - def get_users_membership_info_in_group(self, group_id, user_id): - """Get a dict describing the membership of a user in a group. - - Example if joined: - - { - "membership": "join", - "is_public": True, - "is_privileged": False, - } - - Returns an empty dict if the user is not join/invite/etc - """ - - def _get_users_membership_in_group_txn(txn): - row = self.db.simple_select_one_txn( - txn, - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcols=("is_admin", "is_public"), - allow_none=True, - ) - - if row: - return { - "membership": "join", - "is_public": row["is_public"], - "is_privileged": row["is_admin"], - } - - row = self.db.simple_select_one_onecol_txn( - txn, - table="group_invites", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - allow_none=True, - ) - - if row: - return {"membership": "invite"} - - return {} - - return self.db.runInteraction( - "get_users_membership_info_in_group", _get_users_membership_in_group_txn - ) - def add_user_to_group( self, group_id, @@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore): "remove_room_from_group", _remove_room_from_group_txn ) - def get_publicised_groups_for_user(self, user_id): - """Get all groups a user is publicising - """ - return self.db.simple_select_onecol( - table="local_group_membership", - keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True}, - retcol="group_id", - desc="get_publicised_groups_for_user", - ) - def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ @@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore): desc="update_group_profile", ) - def get_attestations_need_renewals(self, valid_until_ms): - """Get all attestations that need to be renewed until givent time - """ - - def _get_attestations_need_renewals_txn(txn): - sql = """ - SELECT group_id, user_id FROM group_attestations_renewals - WHERE valid_until_ms <= ? - """ - txn.execute(sql, (valid_until_ms,)) - return self.db.cursor_to_dict(txn) - - return self.db.runInteraction( - "get_attestations_need_renewals", _get_attestations_need_renewals_txn - ) - def update_attestation_renewal(self, group_id, user_id, attestation): """Update an attestation that we have renewed """ @@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_attestation_renewal", ) - @defer.inlineCallbacks - def get_remote_attestation(self, group_id, user_id): - """Get the attestation that proves the remote agrees that the user is - in the group. - """ - row = yield self.db.simple_select_one( - table="group_attestations_remote", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcols=("valid_until_ms", "attestation_json"), - desc="get_remote_attestation", - allow_none=True, - ) - - now = int(self._clock.time_msec()) - if row and now < row["valid_until_ms"]: - return json.loads(row["attestation_json"]) - - return None - - def get_joined_groups(self, user_id): - return self.db.simple_select_onecol( - table="local_group_membership", - keyvalues={"user_id": user_id, "membership": "join"}, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token)) - return [ - { - "group_id": row[0], - "type": row[1], - "membership": row[2], - "content": json.loads(row[3]), - } - for row in txn - ] - - return self.db.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token - ) - if not has_changed: - return defer.succeed([]) - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token)) - return [ - { - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } - for group_id, membership, gtype, content_json in txn - ] - - return self.db.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn - ) - - def get_all_groups_changes(self, from_token, to_token, limit): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_any_entity_changed( - from_token - ) - if not has_changed: - return defer.succeed([]) - - def _get_all_groups_changes_txn(txn): - sql = """ - SELECT stream_id, group_id, user_id, type, content - FROM local_group_updates - WHERE ? < stream_id AND stream_id <= ? - LIMIT ? - """ - txn.execute(sql, (from_token, to_token, limit)) - return [ - (stream_id, group_id, user_id, gtype, json.loads(content_json)) - for stream_id, group_id, user_id, gtype, content_json in txn - ] - - return self.db.runInteraction( - "get_all_groups_changes", _get_all_groups_changes_txn - ) - def get_group_stream_token(self): return self._group_updates_id_gen.get_current_token()