mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 07:28:55 +03:00
Reduce cache size by not storing deferreds
Currently the cache descriptors store deferreds rather than raw values, this is a simple way of triggering only one database hit and sharing the result if two callers attempt to get the same value. However, there are a few caches that simply store a mapping from string to string (or int). These caches can have a large number of entries, under the assumption that each entry is small. However, the size of a deferred (specifically the size of ObservableDeferred) is signigicantly larger than that of the raw value, 2kb vs 32b. This PR therefore changes the cache descriptors to store the raw values rather than the deferreds. As a side effect cached storage function now either return a deferred or the actual value, as the cached list decriptor already does. This is fine as we always end up just yield'ing on the returned value eventually, which handles that case correctly.
This commit is contained in:
parent
7af825bae4
commit
119cb9bbcf
2 changed files with 28 additions and 22 deletions
|
@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
|
|||
# Returns an ObservableDeferred
|
||||
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
|
||||
|
||||
if res and res.called and user_id in res.result:
|
||||
# We'd only be adding to the set, so no point invalidating if the
|
||||
# user is already there
|
||||
return
|
||||
if res:
|
||||
if isinstance(res, defer.Deferred) and res.called:
|
||||
res = res.result
|
||||
if user_id in res:
|
||||
# We'd only be adding to the set, so no point invalidating if the
|
||||
# user is already there
|
||||
return
|
||||
|
||||
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
|
|||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
|
||||
from . import DEBUG_CACHES, register_cache
|
||||
from . import register_cache
|
||||
|
||||
from twisted.internet import defer
|
||||
from collections import namedtuple
|
||||
|
@ -76,7 +76,7 @@ class Cache(object):
|
|||
|
||||
self.cache = LruCache(
|
||||
max_size=max_entries, keylen=keylen, cache_type=cache_type,
|
||||
size_callback=(lambda d: len(d.result)) if iterable else None,
|
||||
size_callback=(lambda d: len(d)) if iterable else None,
|
||||
)
|
||||
|
||||
self.name = name
|
||||
|
@ -96,6 +96,17 @@ class Cache(object):
|
|||
)
|
||||
|
||||
def get(self, key, default=_CacheSentinel, callback=None):
|
||||
"""Looks the key up in the caches.
|
||||
|
||||
Args:
|
||||
key(tuple)
|
||||
default: What is returned if key is not in the caches. If not
|
||||
specified then function throws KeyError instead
|
||||
callback(fn): Gets called when the entry in the cache is invalidated
|
||||
|
||||
Returns:
|
||||
Either a Deferred or the raw result
|
||||
"""
|
||||
callbacks = [callback] if callback else []
|
||||
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
||||
if val is not _CacheSentinel:
|
||||
|
@ -137,7 +148,7 @@ class Cache(object):
|
|||
if self.sequence == entry.sequence:
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, entry.deferred, entry.callbacks)
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
entry.invalidate()
|
||||
else:
|
||||
|
@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
|
|||
try:
|
||||
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
|
||||
|
||||
observer = cached_result_d.observe()
|
||||
if DEBUG_CACHES:
|
||||
@defer.inlineCallbacks
|
||||
def check_result(cached_result):
|
||||
actual_result = yield self.function_to_call(obj, *args, **kwargs)
|
||||
if actual_result != cached_result:
|
||||
logger.error(
|
||||
"Stale cache entry %s%r: cached: %r, actual %r",
|
||||
self.orig.__name__, cache_key,
|
||||
cached_result, actual_result,
|
||||
)
|
||||
raise ValueError("Stale cache entry")
|
||||
defer.returnValue(cached_result)
|
||||
observer.addCallback(check_result)
|
||||
if isinstance(cached_result_d, ObservableDeferred):
|
||||
observer = cached_result_d.observe()
|
||||
else:
|
||||
observer = cached_result_d
|
||||
|
||||
except KeyError:
|
||||
ret = defer.maybeDeferred(
|
||||
|
@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
|
|||
|
||||
try:
|
||||
res = cache.get(tuple(key), callback=invalidate_callback)
|
||||
if not res.has_succeeded():
|
||||
if not isinstance(res, ObservableDeferred):
|
||||
results[arg] = res
|
||||
elif not res.has_succeeded():
|
||||
res = res.observe()
|
||||
res.addCallback(lambda r, arg: (arg, r), arg)
|
||||
cached_defers[arg] = res
|
||||
|
|
Loading…
Reference in a new issue