mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 15:39:00 +03:00
3e4af36bc8
Since the object it returns is a ReplicationCommandHandler. This is clean-up from adding support to Redis where the command handler was added as an additional layer of abstraction from the TCP protocol.
485 lines
18 KiB
Python
485 lines
18 KiB
Python
# Copyright 2019 New Vector Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import List, Optional
|
|
|
|
from synapse.api.constants import EventTypes, Membership
|
|
from synapse.events import EventBase
|
|
from synapse.replication.tcp.commands import RdataCommand
|
|
from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
|
|
from synapse.replication.tcp.streams.events import (
|
|
EventsStreamCurrentStateRow,
|
|
EventsStreamEventRow,
|
|
EventsStreamRow,
|
|
)
|
|
from synapse.rest import admin
|
|
from synapse.rest.client import login, room
|
|
|
|
from tests.replication._base import BaseStreamTestCase
|
|
from tests.test_utils.event_injection import inject_event, inject_member_event
|
|
|
|
|
|
class EventsStreamTestCase(BaseStreamTestCase):
|
|
servlets = [
|
|
admin.register_servlets,
|
|
login.register_servlets,
|
|
room.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor, clock, hs):
|
|
super().prepare(reactor, clock, hs)
|
|
self.user_id = self.register_user("u1", "pass")
|
|
self.user_tok = self.login("u1", "pass")
|
|
|
|
self.reconnect()
|
|
|
|
self.room_id = self.helper.create_room_as(tok=self.user_tok)
|
|
self.test_handler.received_rdata_rows.clear()
|
|
|
|
def test_update_function_event_row_limit(self):
|
|
"""Test replication with many non-state events
|
|
|
|
Checks that all events are correctly replicated when there are lots of
|
|
event rows to be replicated.
|
|
"""
|
|
# disconnect, so that we can stack up some changes
|
|
self.disconnect()
|
|
|
|
# generate lots of non-state events. We inject them using inject_event
|
|
# so that they are not send out over replication until we call self.replicate().
|
|
events = [
|
|
self._inject_test_event()
|
|
for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
|
|
]
|
|
|
|
# also one state event
|
|
state_event = self._inject_state_event()
|
|
|
|
# check we're testing what we think we are: no rows should yet have been
|
|
# received
|
|
self.assertEqual([], self.test_handler.received_rdata_rows)
|
|
|
|
# now reconnect to pull the updates
|
|
self.reconnect()
|
|
self.replicate()
|
|
|
|
# we should have received all the expected rows in the right order (as
|
|
# well as various cache invalidation updates which we ignore)
|
|
received_rows = [
|
|
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
|
|
]
|
|
|
|
for event in events:
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, event.event_id)
|
|
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, state_event.event_id)
|
|
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "state")
|
|
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
|
|
self.assertEqual(row.data.event_id, state_event.event_id)
|
|
|
|
self.assertEqual([], received_rows)
|
|
|
|
def test_update_function_huge_state_change(self):
|
|
"""Test replication with many state events
|
|
|
|
Ensures that all events are correctly replicated when there are lots of
|
|
state change rows to be replicated.
|
|
"""
|
|
|
|
# we want to generate lots of state changes at a single stream ID.
|
|
#
|
|
# We do this by having two branches in the DAG. On one, we have a moderator
|
|
# which that generates lots of state; on the other, we de-op the moderator,
|
|
# thus invalidating all the state.
|
|
|
|
OTHER_USER = "@other_user:localhost"
|
|
|
|
# have the user join
|
|
self.get_success(
|
|
inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
|
|
)
|
|
|
|
# Update existing power levels with mod at PL50
|
|
pls = self.helper.get_state(
|
|
self.room_id, EventTypes.PowerLevels, tok=self.user_tok
|
|
)
|
|
pls["users"][OTHER_USER] = 50
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
EventTypes.PowerLevels,
|
|
pls,
|
|
tok=self.user_tok,
|
|
)
|
|
|
|
# this is the point in the DAG where we make a fork
|
|
fork_point: List[str] = self.get_success(
|
|
self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
|
|
)
|
|
|
|
events = [
|
|
self._inject_state_event(sender=OTHER_USER)
|
|
for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
|
|
]
|
|
|
|
self.replicate()
|
|
# all those events and state changes should have landed
|
|
self.assertGreaterEqual(
|
|
len(self.test_handler.received_rdata_rows), 2 * len(events)
|
|
)
|
|
|
|
# disconnect, so that we can stack up the changes
|
|
self.disconnect()
|
|
self.test_handler.received_rdata_rows.clear()
|
|
|
|
# a state event which doesn't get rolled back, to check that the state
|
|
# before the huge update comes through ok
|
|
state1 = self._inject_state_event()
|
|
|
|
# roll back all the state by de-modding the user
|
|
prev_events = fork_point
|
|
pls["users"][OTHER_USER] = 0
|
|
pl_event = self.get_success(
|
|
inject_event(
|
|
self.hs,
|
|
prev_event_ids=prev_events,
|
|
type=EventTypes.PowerLevels,
|
|
state_key="",
|
|
sender=self.user_id,
|
|
room_id=self.room_id,
|
|
content=pls,
|
|
)
|
|
)
|
|
|
|
# one more bit of state that doesn't get rolled back
|
|
state2 = self._inject_state_event()
|
|
|
|
# check we're testing what we think we are: no rows should yet have been
|
|
# received
|
|
self.assertEqual([], self.test_handler.received_rdata_rows)
|
|
|
|
# now reconnect to pull the updates
|
|
self.reconnect()
|
|
self.replicate()
|
|
|
|
# we should have received all the expected rows in the right order (as
|
|
# well as various cache invalidation updates which we ignore)
|
|
#
|
|
# we expect:
|
|
#
|
|
# - two rows for state1
|
|
# - the PL event row, plus state rows for the PL event and each
|
|
# of the states that got reverted.
|
|
# - two rows for state2
|
|
|
|
received_rows = [
|
|
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
|
|
]
|
|
|
|
# first check the first two rows, which should be state1
|
|
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, state1.event_id)
|
|
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "state")
|
|
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
|
|
self.assertEqual(row.data.event_id, state1.event_id)
|
|
|
|
# now the last two rows, which should be state2
|
|
stream_name, token, row = received_rows.pop(-2)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, state2.event_id)
|
|
|
|
stream_name, token, row = received_rows.pop(-1)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "state")
|
|
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
|
|
self.assertEqual(row.data.event_id, state2.event_id)
|
|
|
|
# that should leave us with the rows for the PL event
|
|
self.assertEqual(len(received_rows), len(events) + 2)
|
|
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, pl_event.event_id)
|
|
|
|
# the state rows are unsorted
|
|
state_rows: List[EventsStreamCurrentStateRow] = []
|
|
for stream_name, _, row in received_rows:
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "state")
|
|
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
|
|
state_rows.append(row.data)
|
|
|
|
state_rows.sort(key=lambda r: r.state_key)
|
|
|
|
sr = state_rows.pop(0)
|
|
self.assertEqual(sr.type, EventTypes.PowerLevels)
|
|
self.assertEqual(sr.event_id, pl_event.event_id)
|
|
for sr in state_rows:
|
|
self.assertEqual(sr.type, "test_state_event")
|
|
# "None" indicates the state has been deleted
|
|
self.assertIsNone(sr.event_id)
|
|
|
|
def test_update_function_state_row_limit(self):
|
|
"""Test replication with many state events over several stream ids."""
|
|
|
|
# we want to generate lots of state changes, but for this test, we want to
|
|
# spread out the state changes over a few stream IDs.
|
|
#
|
|
# We do this by having two branches in the DAG. On one, we have four moderators,
|
|
# each of which that generates lots of state; on the other, we de-op the users,
|
|
# thus invalidating all the state.
|
|
|
|
NUM_USERS = 4
|
|
STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
|
|
|
|
user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
|
|
|
|
# have the users join
|
|
for u in user_ids:
|
|
self.get_success(
|
|
inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
|
|
)
|
|
|
|
# Update existing power levels with mod at PL50
|
|
pls = self.helper.get_state(
|
|
self.room_id, EventTypes.PowerLevels, tok=self.user_tok
|
|
)
|
|
pls["users"].update({u: 50 for u in user_ids})
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
EventTypes.PowerLevels,
|
|
pls,
|
|
tok=self.user_tok,
|
|
)
|
|
|
|
# this is the point in the DAG where we make a fork
|
|
fork_point: List[str] = self.get_success(
|
|
self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
|
|
)
|
|
|
|
events: List[EventBase] = []
|
|
for user in user_ids:
|
|
events.extend(
|
|
self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
|
|
)
|
|
|
|
self.replicate()
|
|
|
|
# all those events and state changes should have landed
|
|
self.assertGreaterEqual(
|
|
len(self.test_handler.received_rdata_rows), 2 * len(events)
|
|
)
|
|
|
|
# disconnect, so that we can stack up the changes
|
|
self.disconnect()
|
|
self.test_handler.received_rdata_rows.clear()
|
|
|
|
# now roll back all that state by de-modding the users
|
|
prev_events = fork_point
|
|
pl_events = []
|
|
for u in user_ids:
|
|
pls["users"][u] = 0
|
|
e = self.get_success(
|
|
inject_event(
|
|
self.hs,
|
|
prev_event_ids=prev_events,
|
|
type=EventTypes.PowerLevels,
|
|
state_key="",
|
|
sender=self.user_id,
|
|
room_id=self.room_id,
|
|
content=pls,
|
|
)
|
|
)
|
|
prev_events = [e.event_id]
|
|
pl_events.append(e)
|
|
|
|
# check we're testing what we think we are: no rows should yet have been
|
|
# received
|
|
self.assertEqual([], self.test_handler.received_rdata_rows)
|
|
|
|
# now reconnect to pull the updates
|
|
self.reconnect()
|
|
self.replicate()
|
|
|
|
# we should have received all the expected rows in the right order (as
|
|
# well as various cache invalidation updates which we ignore)
|
|
received_rows = [
|
|
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
|
|
]
|
|
self.assertGreaterEqual(len(received_rows), len(events))
|
|
for i in range(NUM_USERS):
|
|
# for each user, we expect the PL event row, followed by state rows for
|
|
# the PL event and each of the states that got reverted.
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, pl_events[i].event_id)
|
|
|
|
# the state rows are unsorted
|
|
state_rows: List[EventsStreamCurrentStateRow] = []
|
|
for _ in range(STATES_PER_USER + 1):
|
|
stream_name, token, row = received_rows.pop(0)
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "state")
|
|
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
|
|
state_rows.append(row.data)
|
|
|
|
state_rows.sort(key=lambda r: r.state_key)
|
|
|
|
sr = state_rows.pop(0)
|
|
self.assertEqual(sr.type, EventTypes.PowerLevels)
|
|
self.assertEqual(sr.event_id, pl_events[i].event_id)
|
|
for sr in state_rows:
|
|
self.assertEqual(sr.type, "test_state_event")
|
|
# "None" indicates the state has been deleted
|
|
self.assertIsNone(sr.event_id)
|
|
|
|
self.assertEqual([], received_rows)
|
|
|
|
def test_backwards_stream_id(self):
|
|
"""
|
|
Test that RDATA that comes after the current position should be discarded.
|
|
"""
|
|
# disconnect, so that we can stack up some changes
|
|
self.disconnect()
|
|
|
|
# Generate an events. We inject them using inject_event so that they are
|
|
# not send out over replication until we call self.replicate().
|
|
event = self._inject_test_event()
|
|
|
|
# check we're testing what we think we are: no rows should yet have been
|
|
# received
|
|
self.assertEqual([], self.test_handler.received_rdata_rows)
|
|
|
|
# now reconnect to pull the updates
|
|
self.reconnect()
|
|
self.replicate()
|
|
|
|
# We should have received the expected single row (as well as various
|
|
# cache invalidation updates which we ignore).
|
|
received_rows = [
|
|
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
|
|
]
|
|
|
|
# There should be a single received row.
|
|
self.assertEqual(len(received_rows), 1)
|
|
|
|
stream_name, token, row = received_rows[0]
|
|
self.assertEqual("events", stream_name)
|
|
self.assertIsInstance(row, EventsStreamRow)
|
|
self.assertEqual(row.type, "ev")
|
|
self.assertIsInstance(row.data, EventsStreamEventRow)
|
|
self.assertEqual(row.data.event_id, event.event_id)
|
|
|
|
# Reset the data.
|
|
self.test_handler.received_rdata_rows = []
|
|
|
|
# Save the current token for later.
|
|
worker_events_stream = self.worker_hs.get_replication_streams()["events"]
|
|
prev_token = worker_events_stream.current_token("master")
|
|
|
|
# Manually send an old RDATA command, which should get dropped. This
|
|
# re-uses the row from above, but with an earlier stream token.
|
|
self.hs.get_replication_command_handler().send_command(
|
|
RdataCommand("events", "master", 1, row)
|
|
)
|
|
|
|
# No updates have been received (because it was discard as old).
|
|
received_rows = [
|
|
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
|
|
]
|
|
self.assertEqual(len(received_rows), 0)
|
|
|
|
# Ensure the stream has not gone backwards.
|
|
current_token = worker_events_stream.current_token("master")
|
|
self.assertGreaterEqual(current_token, prev_token)
|
|
|
|
event_count = 0
|
|
|
|
def _inject_test_event(
|
|
self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
|
|
) -> EventBase:
|
|
if sender is None:
|
|
sender = self.user_id
|
|
|
|
if body is None:
|
|
body = "event %i" % (self.event_count,)
|
|
self.event_count += 1
|
|
|
|
return self.get_success(
|
|
inject_event(
|
|
self.hs,
|
|
room_id=self.room_id,
|
|
sender=sender,
|
|
type="test_event",
|
|
content={"body": body},
|
|
**kwargs,
|
|
)
|
|
)
|
|
|
|
def _inject_state_event(
|
|
self,
|
|
body: Optional[str] = None,
|
|
state_key: Optional[str] = None,
|
|
sender: Optional[str] = None,
|
|
) -> EventBase:
|
|
if sender is None:
|
|
sender = self.user_id
|
|
|
|
if state_key is None:
|
|
state_key = "state_%i" % (self.event_count,)
|
|
self.event_count += 1
|
|
|
|
if body is None:
|
|
body = "state event %s" % (state_key,)
|
|
|
|
return self.get_success(
|
|
inject_event(
|
|
self.hs,
|
|
room_id=self.room_id,
|
|
sender=sender,
|
|
type="test_state_event",
|
|
state_key=state_key,
|
|
content={"body": body},
|
|
)
|
|
)
|