mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-21 03:42:55 +03:00
81e0f57800
There is a bug with the `StreamChangeCache` where it would incorrectly return that all entities had changed if asked for entities changed *since* the earliest stream position. Note that for streams we use the inequalities: `$min_stream_id < stream_id <= $max_stream_id`, i.e. when we ask the stream change cache for all things that have changed since `$stream_id` we don't care for events that happened *at* `$stream_id`. Specifically: `_earliest_known_stream_pos` is the position at which we know that we'll have entries for all changes since that point, we can use the cache for any stream IDs that equal `_earliest_known_stream_pos`. `_earliest_known_stream_pos` is set in three places: - On startup we set it either to: - the current maximum stream ID, with not prefilled values; or - the minimum of the latest N values we pulled from the DB - When we evict items from the bottom, we set it to the stream ID of the evicted items. This was changed in https://github.com/matrix-org/synapse/pull/14435, but I think we were overly conservative there. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
257 lines
10 KiB
Python
257 lines
10 KiB
Python
from parameterized import parameterized
|
|
|
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
|
|
|
from tests import unittest
|
|
|
|
|
|
class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
|
"""
|
|
Tests for StreamChangeCache.
|
|
"""
|
|
|
|
def test_prefilled_cache(self) -> None:
|
|
"""
|
|
Providing a prefilled cache to StreamChangeCache will result in a cache
|
|
with the prefilled-cache entered in.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
|
|
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
|
|
|
|
def test_has_entity_changed(self) -> None:
|
|
"""
|
|
StreamChangeCache.entity_has_changed will mark entities as changed, and
|
|
has_entity_changed will observe the changed entities.
|
|
"""
|
|
cache = StreamChangeCache("#test", 3)
|
|
|
|
cache.entity_has_changed("user@foo.com", 6)
|
|
cache.entity_has_changed("bar@baz.net", 7)
|
|
|
|
# also test multiple things changing on the same stream ID
|
|
cache.entity_has_changed("user2@foo.com", 8)
|
|
cache.entity_has_changed("bar2@baz.net", 8)
|
|
|
|
# If it's been changed after that stream position, return True
|
|
self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
|
|
self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
|
|
self.assertTrue(cache.has_entity_changed("bar2@baz.net", 4))
|
|
self.assertTrue(cache.has_entity_changed("user2@foo.com", 4))
|
|
|
|
# If it's been changed at that stream position, return False
|
|
self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
|
|
self.assertFalse(cache.has_entity_changed("user2@foo.com", 8))
|
|
|
|
# If there's no changes after that stream position, return False
|
|
self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
|
|
self.assertFalse(cache.has_entity_changed("user2@foo.com", 9))
|
|
|
|
# If the entity does not exist, return False.
|
|
self.assertFalse(cache.has_entity_changed("not@here.website", 9))
|
|
|
|
# If we request before the stream cache's earliest known position,
|
|
# return True, whether it's a known entity or not.
|
|
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
|
|
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
|
|
self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
|
|
self.assertTrue(cache.has_entity_changed("not@here.website", 2))
|
|
|
|
def test_entity_has_changed_pops_off_start(self) -> None:
|
|
"""
|
|
StreamChangeCache.entity_has_changed will respect the max size and
|
|
purge the oldest items upon reaching that max size.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1, max_size=2)
|
|
|
|
cache.entity_has_changed("user@foo.com", 2)
|
|
cache.entity_has_changed("bar@baz.net", 3)
|
|
cache.entity_has_changed("user@elsewhere.org", 4)
|
|
|
|
# The cache is at the max size, 2
|
|
self.assertEqual(len(cache._cache), 2)
|
|
# The cache's earliest known position is 2.
|
|
self.assertEqual(cache._earliest_known_stream_pos, 2)
|
|
|
|
# The oldest item has been popped off
|
|
self.assertTrue("user@foo.com" not in cache._entity_to_key)
|
|
|
|
self.assertEqual(
|
|
cache.get_all_entities_changed(2).entities,
|
|
["bar@baz.net", "user@elsewhere.org"],
|
|
)
|
|
self.assertFalse(cache.get_all_entities_changed(1).hit)
|
|
self.assertTrue(cache.get_all_entities_changed(2).hit)
|
|
|
|
# If we update an existing entity, it keeps the two existing entities
|
|
cache.entity_has_changed("bar@baz.net", 5)
|
|
self.assertEqual(
|
|
{"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key)
|
|
)
|
|
self.assertEqual(
|
|
cache.get_all_entities_changed(3).entities,
|
|
["user@elsewhere.org", "bar@baz.net"],
|
|
)
|
|
self.assertFalse(cache.get_all_entities_changed(1).hit)
|
|
self.assertTrue(cache.get_all_entities_changed(2).hit)
|
|
|
|
def test_get_all_entities_changed(self) -> None:
|
|
"""
|
|
StreamChangeCache.get_all_entities_changed will return all changed
|
|
entities since the given position. If the position is before the start
|
|
of the known stream, it returns None instead.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1)
|
|
|
|
cache.entity_has_changed("user@foo.com", 2)
|
|
cache.entity_has_changed("bar@baz.net", 3)
|
|
cache.entity_has_changed("anotheruser@foo.com", 3)
|
|
cache.entity_has_changed("user@elsewhere.org", 4)
|
|
|
|
r = cache.get_all_entities_changed(2)
|
|
|
|
# Results are ordered so either of these are valid.
|
|
ok1 = ["bar@baz.net", "anotheruser@foo.com", "user@elsewhere.org"]
|
|
ok2 = ["anotheruser@foo.com", "bar@baz.net", "user@elsewhere.org"]
|
|
self.assertTrue(r.entities == ok1 or r.entities == ok2)
|
|
|
|
self.assertEqual(
|
|
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
|
|
)
|
|
self.assertFalse(cache.get_all_entities_changed(0).hit)
|
|
self.assertTrue(cache.get_all_entities_changed(1).hit)
|
|
|
|
# ... later, things gest more updates
|
|
cache.entity_has_changed("user@foo.com", 5)
|
|
cache.entity_has_changed("bar@baz.net", 5)
|
|
cache.entity_has_changed("anotheruser@foo.com", 6)
|
|
|
|
ok1 = [
|
|
"user@elsewhere.org",
|
|
"user@foo.com",
|
|
"bar@baz.net",
|
|
"anotheruser@foo.com",
|
|
]
|
|
ok2 = [
|
|
"user@elsewhere.org",
|
|
"bar@baz.net",
|
|
"user@foo.com",
|
|
"anotheruser@foo.com",
|
|
]
|
|
r = cache.get_all_entities_changed(3)
|
|
self.assertTrue(r.entities == ok1 or r.entities == ok2)
|
|
|
|
def test_has_any_entity_changed(self) -> None:
|
|
"""
|
|
StreamChangeCache.has_any_entity_changed will return True if any
|
|
entities have been changed since the provided stream position, and
|
|
False if they have not. If the cache has entries and the provided
|
|
stream position is before it, it will return True, otherwise False if
|
|
the cache has no entries.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1)
|
|
|
|
# With no entities, it returns True for the past, present, and False for
|
|
# the future.
|
|
self.assertTrue(cache.has_any_entity_changed(0))
|
|
self.assertFalse(cache.has_any_entity_changed(1))
|
|
self.assertFalse(cache.has_any_entity_changed(2))
|
|
|
|
# We add an entity
|
|
cache.entity_has_changed("user@foo.com", 2)
|
|
|
|
# With an entity, it returns True for the past, the stream start
|
|
# position, and False for the stream position the entity was changed
|
|
# on and ones after it.
|
|
self.assertTrue(cache.has_any_entity_changed(0))
|
|
self.assertTrue(cache.has_any_entity_changed(1))
|
|
self.assertFalse(cache.has_any_entity_changed(2))
|
|
self.assertFalse(cache.has_any_entity_changed(3))
|
|
|
|
@parameterized.expand([(0,), (1000000000,)])
|
|
def test_get_entities_changed(self, perf_factor: int) -> None:
|
|
"""
|
|
StreamChangeCache.get_entities_changed will return the entities in the
|
|
given list that have changed since the provided stream ID. If the
|
|
stream position is earlier than the earliest known position, it will
|
|
return all of the entities queried for.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1)
|
|
|
|
cache.entity_has_changed("user@foo.com", 2)
|
|
cache.entity_has_changed("bar@baz.net", 3)
|
|
cache.entity_has_changed("user@elsewhere.org", 4)
|
|
|
|
# Query all the entries, but mid-way through the stream. We should only
|
|
# get the ones after that point.
|
|
self.assertEqual(
|
|
cache.get_entities_changed(
|
|
["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
|
|
stream_pos=2,
|
|
_perf_factor=perf_factor,
|
|
),
|
|
{"bar@baz.net", "user@elsewhere.org"},
|
|
)
|
|
|
|
# Query all the entries mid-way through the stream, but include one
|
|
# that doesn't exist in it. We shouldn't get back the one that doesn't
|
|
# exist.
|
|
self.assertEqual(
|
|
cache.get_entities_changed(
|
|
[
|
|
"user@foo.com",
|
|
"bar@baz.net",
|
|
"user@elsewhere.org",
|
|
"not@here.website",
|
|
],
|
|
stream_pos=2,
|
|
_perf_factor=perf_factor,
|
|
),
|
|
{"bar@baz.net", "user@elsewhere.org"},
|
|
)
|
|
|
|
# Query all the entries, but before the first known point. We will get
|
|
# all the entries we queried for, including ones that don't exist.
|
|
self.assertEqual(
|
|
cache.get_entities_changed(
|
|
[
|
|
"user@foo.com",
|
|
"bar@baz.net",
|
|
"user@elsewhere.org",
|
|
"not@here.website",
|
|
],
|
|
stream_pos=0,
|
|
_perf_factor=perf_factor,
|
|
),
|
|
{"user@foo.com", "bar@baz.net", "user@elsewhere.org", "not@here.website"},
|
|
)
|
|
|
|
# Query a subset of the entries mid-way through the stream. We should
|
|
# only get back the subset.
|
|
self.assertEqual(
|
|
cache.get_entities_changed(
|
|
["bar@baz.net"],
|
|
stream_pos=2,
|
|
_perf_factor=perf_factor,
|
|
),
|
|
{"bar@baz.net"},
|
|
)
|
|
|
|
def test_max_pos(self) -> None:
|
|
"""
|
|
StreamChangeCache.get_max_pos_of_last_change will return the most
|
|
recent point where the entity could have changed. If the entity is not
|
|
known, the stream start is provided instead.
|
|
"""
|
|
cache = StreamChangeCache("#test", 1)
|
|
|
|
cache.entity_has_changed("user@foo.com", 2)
|
|
cache.entity_has_changed("bar@baz.net", 3)
|
|
cache.entity_has_changed("user@elsewhere.org", 4)
|
|
|
|
# Known entities will return the point where they were changed.
|
|
self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2)
|
|
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
|
|
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
|
|
|
|
# Unknown entities will return None
|
|
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None)
|