Remove groups replication code. (#12900)

The replication logic for groups is no longer used, so the message
passing infrastructure can be removed.
This commit is contained in:
Patrick Cloke 2022-05-31 13:04:08 -04:00 committed by GitHub
parent 2fc787c341
commit cf05258f76
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1 additions and 90 deletions

View file

@ -0,0 +1 @@
Remove support for the non-standard groups/communities feature from Synapse.

View file

@ -37,7 +37,6 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
@ -55,7 +54,6 @@ class AdminCmdSlavedStore(
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedFilteringStore, SlavedFilteringStore,
SlavedGroupServerStore,
SlavedDeviceInboxStore, SlavedDeviceInboxStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedPushRuleStore, SlavedPushRuleStore,

View file

@ -58,7 +58,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@ -233,7 +232,6 @@ class GenericWorkerSlavedStore(
SlavedDeviceStore, SlavedDeviceStore,
SlavedReceiptsStore, SlavedReceiptsStore,
SlavedPushRuleStore, SlavedPushRuleStore,
SlavedGroupServerStore,
SlavedAccountDataStore, SlavedAccountDataStore,
SlavedPusherStore, SlavedPusherStore,
CensorEventsStore, CensorEventsStore,

View file

@ -1,58 +0,0 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Any, Iterable
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import GroupServerStream
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.group_server import GroupServerWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING:
from synapse.server import HomeServer
class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.hs = hs
self._group_updates_id_gen = SlavedIdTracker(
db_conn, "local_group_updates", "stream_id"
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache",
self._group_updates_id_gen.get_current_token(),
)
def get_group_stream_token(self) -> int:
return self._group_updates_id_gen.get_current_token()
def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == GroupServerStream.NAME:
self._group_updates_id_gen.advance(instance_name, token)
for row in rows:
self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)

View file

@ -30,7 +30,6 @@ from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import ( from synapse.replication.tcp.streams import (
AccountDataStream, AccountDataStream,
DeviceListsStream, DeviceListsStream,
GroupServerStream,
PushersStream, PushersStream,
PushRulesStream, PushRulesStream,
ReceiptsStream, ReceiptsStream,
@ -185,10 +184,6 @@ class ReplicationDataHandler:
self.notifier.on_new_event( self.notifier.on_new_event(
StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
) )
elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
elif stream_name == PushersStream.NAME: elif stream_name == PushersStream.NAME:
for row in rows: for row in rows:
if row.deleted: if row.deleted:

View file

@ -29,7 +29,6 @@ from synapse.replication.tcp.streams._base import (
BackfillStream, BackfillStream,
CachesStream, CachesStream,
DeviceListsStream, DeviceListsStream,
GroupServerStream,
PresenceFederationStream, PresenceFederationStream,
PresenceStream, PresenceStream,
PushersStream, PushersStream,
@ -61,7 +60,6 @@ STREAMS_MAP = {
FederationStream, FederationStream,
TagAccountDataStream, TagAccountDataStream,
AccountDataStream, AccountDataStream,
GroupServerStream,
UserSignatureStream, UserSignatureStream,
) )
} }
@ -81,6 +79,5 @@ __all__ = [
"ToDeviceStream", "ToDeviceStream",
"TagAccountDataStream", "TagAccountDataStream",
"AccountDataStream", "AccountDataStream",
"GroupServerStream",
"UserSignatureStream", "UserSignatureStream",
] ]

View file

@ -585,26 +585,6 @@ class AccountDataStream(Stream):
return updates, to_token, limited return updates, to_token, limited
class GroupServerStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class GroupsStreamRow:
group_id: str
user_id: str
type: str
content: JsonDict
NAME = "groups"
ROW_TYPE = GroupsStreamRow
def __init__(self, hs: "HomeServer"):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_group_stream_token),
store.get_all_groups_changes,
)
class UserSignatureStream(Stream): class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key""" """A user has signed their own device with their user-signing key"""