mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-20 19:10:45 +03:00
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
4abecb7b02
14 changed files with 152 additions and 101 deletions
|
@ -319,6 +319,7 @@ class TransactionQueue(object):
|
||||||
destination,
|
destination,
|
||||||
self.clock,
|
self.clock,
|
||||||
self.store,
|
self.store,
|
||||||
|
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||||
)
|
)
|
||||||
|
|
||||||
device_message_edus, device_stream_id, dev_list_id = (
|
device_message_edus, device_stream_id, dev_list_id = (
|
||||||
|
|
|
@ -220,6 +220,22 @@ class DeviceHandler(BaseHandler):
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
self.federation_sender.send_device_messages(host)
|
self.federation_sender.send_device_messages(host)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_user_ids_changed(self, user_id, from_device_key):
|
||||||
|
rooms = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
room_ids = set(r.room_id for r in rooms)
|
||||||
|
|
||||||
|
user_ids_changed = set()
|
||||||
|
changed = yield self.store.get_user_whose_devices_changed(
|
||||||
|
from_device_key
|
||||||
|
)
|
||||||
|
for other_user_id in changed:
|
||||||
|
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
|
||||||
|
if room_ids.intersection(e.room_id for e in other_rooms):
|
||||||
|
user_ids_changed.add(other_user_id)
|
||||||
|
|
||||||
|
defer.returnValue(user_ids_changed)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _incoming_device_list_update(self, origin, edu_content):
|
def _incoming_device_list_update(self, origin, edu_content):
|
||||||
user_id = edu_content["user_id"]
|
user_id = edu_content["user_id"]
|
||||||
|
|
|
@ -574,7 +574,7 @@ class PresenceHandler(object):
|
||||||
if not local_states:
|
if not local_states:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
users = yield self.state.get_current_user_in_room(room_id)
|
users = yield self.store.get_users_in_room(room_id)
|
||||||
hosts = set(get_domain_from_id(u) for u in users)
|
hosts = set(get_domain_from_id(u) for u in users)
|
||||||
|
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
|
@ -766,7 +766,7 @@ class PresenceHandler(object):
|
||||||
# don't need to send to local clients here, as that is done as part
|
# don't need to send to local clients here, as that is done as part
|
||||||
# of the event stream/sync.
|
# of the event stream/sync.
|
||||||
# TODO: Only send to servers not already in the room.
|
# TODO: Only send to servers not already in the room.
|
||||||
user_ids = yield self.state.get_current_user_in_room(room_id)
|
user_ids = yield self.store.get_users_in_room(room_id)
|
||||||
if self.is_mine(user):
|
if self.is_mine(user):
|
||||||
state = yield self.current_state_for_user(user.to_string())
|
state = yield self.current_state_for_user(user.to_string())
|
||||||
|
|
||||||
|
@ -1069,7 +1069,7 @@ class PresenceEventSource(object):
|
||||||
|
|
||||||
user_ids_to_check = set()
|
user_ids_to_check = set()
|
||||||
for room_id in room_ids:
|
for room_id in room_ids:
|
||||||
users = yield self.state.get_current_user_in_room(room_id)
|
users = yield self.store.get_users_in_room(room_id)
|
||||||
user_ids_to_check.update(users)
|
user_ids_to_check.update(users)
|
||||||
|
|
||||||
user_ids_to_check.update(friends)
|
user_ids_to_check.update(friends)
|
||||||
|
|
|
@ -299,9 +299,6 @@ class ReplicationResource(Resource):
|
||||||
"backward_ex_outliers", res.backward_ex_outliers,
|
"backward_ex_outliers", res.backward_ex_outliers,
|
||||||
("position", "event_id", "state_group"),
|
("position", "event_id", "state_group"),
|
||||||
)
|
)
|
||||||
writer.write_header_and_rows(
|
|
||||||
"state_resets", res.state_resets, ("position",),
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def presence(self, writer, current_token, request_streams):
|
def presence(self, writer, current_token, request_streams):
|
||||||
|
|
|
@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def process_replication(self, result):
|
def process_replication(self, result):
|
||||||
state_resets = set(
|
|
||||||
r[0] for r in result.get("state_resets", {"rows": []})["rows"]
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = result.get("events")
|
stream = result.get("events")
|
||||||
if stream:
|
if stream:
|
||||||
self._stream_id_gen.advance(int(stream["position"]))
|
self._stream_id_gen.advance(int(stream["position"]))
|
||||||
|
@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
|
|
||||||
for row in stream["rows"]:
|
for row in stream["rows"]:
|
||||||
self._process_replication_row(
|
self._process_replication_row(
|
||||||
row, backfilled=False, state_resets=state_resets
|
row, backfilled=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
stream = result.get("backfill")
|
stream = result.get("backfill")
|
||||||
|
@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
self._backfill_id_gen.advance(-int(stream["position"]))
|
self._backfill_id_gen.advance(-int(stream["position"]))
|
||||||
for row in stream["rows"]:
|
for row in stream["rows"]:
|
||||||
self._process_replication_row(
|
self._process_replication_row(
|
||||||
row, backfilled=True, state_resets=state_resets
|
row, backfilled=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
stream = result.get("forward_ex_outliers")
|
stream = result.get("forward_ex_outliers")
|
||||||
|
@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
|
|
||||||
return super(SlavedEventStore, self).process_replication(result)
|
return super(SlavedEventStore, self).process_replication(result)
|
||||||
|
|
||||||
def _process_replication_row(self, row, backfilled, state_resets):
|
def _process_replication_row(self, row, backfilled):
|
||||||
position = row[0]
|
|
||||||
internal = json.loads(row[1])
|
internal = json.loads(row[1])
|
||||||
event_json = json.loads(row[2])
|
event_json = json.loads(row[2])
|
||||||
event = FrozenEvent(event_json, internal_metadata_dict=internal)
|
event = FrozenEvent(event_json, internal_metadata_dict=internal)
|
||||||
self.invalidate_caches_for_event(
|
self.invalidate_caches_for_event(
|
||||||
event, backfilled, reset_state=position in state_resets
|
event, backfilled,
|
||||||
)
|
)
|
||||||
|
|
||||||
def invalidate_caches_for_event(self, event, backfilled, reset_state):
|
def invalidate_caches_for_event(self, event, backfilled):
|
||||||
if reset_state:
|
|
||||||
self.get_rooms_for_user.invalidate_all()
|
|
||||||
self.get_users_in_room.invalidate((event.room_id,))
|
|
||||||
|
|
||||||
self._invalidate_get_event_cache(event.event_id)
|
self._invalidate_get_event_cache(event.event_id)
|
||||||
|
|
||||||
self.get_latest_event_ids_in_room.invalidate((event.room_id,))
|
self.get_latest_event_ids_in_room.invalidate((event.room_id,))
|
||||||
|
@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
self._invalidate_get_event_cache(event.redacts)
|
self._invalidate_get_event_cache(event.redacts)
|
||||||
|
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
self.get_rooms_for_user.invalidate((event.state_key,))
|
|
||||||
self.get_users_in_room.invalidate((event.room_id,))
|
|
||||||
self._membership_stream_cache.entity_has_changed(
|
self._membership_stream_cache.entity_has_changed(
|
||||||
event.state_key, event.internal_metadata.stream_ordering
|
event.state_key, event.internal_metadata.stream_ordering
|
||||||
)
|
)
|
||||||
|
|
|
@ -21,6 +21,8 @@ from synapse.api.errors import SynapseError
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
RestServlet, parse_json_object_from_request, parse_integer
|
RestServlet, parse_json_object_from_request, parse_integer
|
||||||
)
|
)
|
||||||
|
from synapse.http.servlet import parse_string
|
||||||
|
from synapse.types import StreamToken
|
||||||
from ._base import client_v2_patterns
|
from ._base import client_v2_patterns
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -149,6 +151,52 @@ class KeyQueryServlet(RestServlet):
|
||||||
defer.returnValue((200, result))
|
defer.returnValue((200, result))
|
||||||
|
|
||||||
|
|
||||||
|
class KeyChangesServlet(RestServlet):
|
||||||
|
"""Returns the list of changes of keys between two stream tokens (may return
|
||||||
|
spurious extra results, since we currently ignore the `to` param).
|
||||||
|
|
||||||
|
GET /keys/changes?from=...&to=...
|
||||||
|
|
||||||
|
200 OK
|
||||||
|
{ "changed": ["@foo:example.com"] }
|
||||||
|
"""
|
||||||
|
PATTERNS = client_v2_patterns(
|
||||||
|
"/keys/changes$",
|
||||||
|
releases=()
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
hs (synapse.server.HomeServer):
|
||||||
|
"""
|
||||||
|
super(KeyChangesServlet, self).__init__()
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
self.device_handler = hs.get_device_handler()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request):
|
||||||
|
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
|
|
||||||
|
from_token_string = parse_string(request, "from")
|
||||||
|
|
||||||
|
# We want to enforce they do pass us one, but we ignore it and return
|
||||||
|
# changes after the "to" as well as before.
|
||||||
|
parse_string(request, "to")
|
||||||
|
|
||||||
|
from_token = StreamToken.from_string(from_token_string)
|
||||||
|
|
||||||
|
user_id = requester.user.to_string()
|
||||||
|
|
||||||
|
changed = yield self.device_handler.get_user_ids_changed(
|
||||||
|
user_id, from_token.device_list_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((200, {
|
||||||
|
"changed": changed
|
||||||
|
}))
|
||||||
|
|
||||||
|
|
||||||
class OneTimeKeyServlet(RestServlet):
|
class OneTimeKeyServlet(RestServlet):
|
||||||
"""
|
"""
|
||||||
POST /keys/claim HTTP/1.1
|
POST /keys/claim HTTP/1.1
|
||||||
|
@ -192,4 +240,5 @@ class OneTimeKeyServlet(RestServlet):
|
||||||
def register_servlets(hs, http_server):
|
def register_servlets(hs, http_server):
|
||||||
KeyUploadServlet(hs).register(http_server)
|
KeyUploadServlet(hs).register(http_server)
|
||||||
KeyQueryServlet(hs).register(http_server)
|
KeyQueryServlet(hs).register(http_server)
|
||||||
|
KeyChangesServlet(hs).register(http_server)
|
||||||
OneTimeKeyServlet(hs).register(http_server)
|
OneTimeKeyServlet(hs).register(http_server)
|
||||||
|
|
|
@ -436,15 +436,27 @@ class DeviceStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
|
def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
|
||||||
|
# First we DELETE all rows such that only the latest row for each
|
||||||
|
# (destination, user_id is left. We do this by selecting first and
|
||||||
|
# deleting.
|
||||||
|
sql = """
|
||||||
|
SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
|
||||||
|
WHERE destination = ? AND stream_id <= ?
|
||||||
|
GROUP BY user_id
|
||||||
|
HAVING count(*) > 1
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (destination, stream_id,))
|
||||||
|
rows = txn.fetchall()
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
DELETE FROM device_lists_outbound_pokes
|
DELETE FROM device_lists_outbound_pokes
|
||||||
WHERE destination = ? AND stream_id < (
|
WHERE destination = ? AND user_id = ? AND stream_id < ?
|
||||||
SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
|
|
||||||
WHERE destination = ? AND stream_id <= ?
|
|
||||||
)
|
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (destination, destination, stream_id,))
|
txn.executemany(
|
||||||
|
sql, ((destination, row[0], row[1],) for row in rows)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mark everything that is left as sent
|
||||||
sql = """
|
sql = """
|
||||||
UPDATE device_lists_outbound_pokes SET sent = ?
|
UPDATE device_lists_outbound_pokes SET sent = ?
|
||||||
WHERE destination = ? AND stream_id <= ?
|
WHERE destination = ? AND stream_id <= ?
|
||||||
|
@ -545,18 +557,22 @@ class DeviceStore(SQLBaseStore):
|
||||||
(destination, user_id) tuple to ensure that the prev_ids remain correct
|
(destination, user_id) tuple to ensure that the prev_ids remain correct
|
||||||
if the server does come back.
|
if the server does come back.
|
||||||
"""
|
"""
|
||||||
now = self._clock.time_msec()
|
yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
|
||||||
|
|
||||||
def _prune_txn(txn):
|
def _prune_txn(txn):
|
||||||
select_sql = """
|
select_sql = """
|
||||||
SELECT destination, user_id, max(stream_id) as stream_id
|
SELECT destination, user_id, max(stream_id) as stream_id
|
||||||
FROM device_lists_outbound_pokes
|
FROM device_lists_outbound_pokes
|
||||||
GROUP BY destination, user_id
|
GROUP BY destination, user_id
|
||||||
|
HAVING min(ts) < ? AND count(*) > 1
|
||||||
"""
|
"""
|
||||||
|
|
||||||
txn.execute(select_sql)
|
txn.execute(select_sql, (yesterday,))
|
||||||
rows = txn.fetchall()
|
rows = txn.fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return
|
||||||
|
|
||||||
delete_sql = """
|
delete_sql = """
|
||||||
DELETE FROM device_lists_outbound_pokes
|
DELETE FROM device_lists_outbound_pokes
|
||||||
WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
|
WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
|
||||||
|
@ -565,11 +581,13 @@ class DeviceStore(SQLBaseStore):
|
||||||
txn.executemany(
|
txn.executemany(
|
||||||
delete_sql,
|
delete_sql,
|
||||||
(
|
(
|
||||||
(now, row["destination"], row["user_id"], row["stream_id"])
|
(yesterday, row[0], row[1], row[2])
|
||||||
for row in rows
|
for row in rows
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.info("Pruned %d device list outbound pokes", txn.rowcount)
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"_prune_old_outbound_device_pokes", _prune_txn
|
"_prune_old_outbound_device_pokes", _prune_txn
|
||||||
)
|
)
|
||||||
|
|
|
@ -129,7 +129,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
@cached()
|
@cached(max_entries=5000, iterable=True)
|
||||||
def get_latest_event_ids_in_room(self, room_id):
|
def get_latest_event_ids_in_room(self, room_id):
|
||||||
return self._simple_select_onecol(
|
return self._simple_select_onecol(
|
||||||
table="event_forward_extremities",
|
table="event_forward_extremities",
|
||||||
|
|
|
@ -28,6 +28,7 @@ from synapse.util.metrics import Measure
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.state import resolve_events
|
from synapse.state import resolve_events
|
||||||
|
from synapse.util.caches.descriptors import cached
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
from collections import deque, namedtuple, OrderedDict
|
from collections import deque, namedtuple, OrderedDict
|
||||||
|
@ -564,16 +565,12 @@ class EventsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
for member in members_changed:
|
for member in members_changed:
|
||||||
txn.call_after(self.get_rooms_for_user.invalidate, (member,))
|
self._invalidate_cache_and_stream(
|
||||||
|
txn, self.get_rooms_for_user, (member,)
|
||||||
|
)
|
||||||
|
|
||||||
txn.call_after(self.get_users_in_room.invalidate, (room_id,))
|
self._invalidate_cache_and_stream(
|
||||||
|
txn, self.get_users_in_room, (room_id,)
|
||||||
# Add an entry to the current_state_resets table to record the point
|
|
||||||
# where we clobbered the current state
|
|
||||||
self._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="current_state_resets",
|
|
||||||
values={"event_stream_ordering": max_stream_order}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id, new_extrem in new_forward_extremeties.items():
|
for room_id, new_extrem in new_forward_extremeties.items():
|
||||||
|
@ -1581,6 +1578,7 @@ class EventsStore(SQLBaseStore):
|
||||||
"""The current minimum token that backfilled events have reached"""
|
"""The current minimum token that backfilled events have reached"""
|
||||||
return -self._backfill_id_gen.get_current_token()
|
return -self._backfill_id_gen.get_current_token()
|
||||||
|
|
||||||
|
@cached(num_args=5, max_entries=10)
|
||||||
def get_all_new_events(self, last_backfill_id, last_forward_id,
|
def get_all_new_events(self, last_backfill_id, last_forward_id,
|
||||||
current_backfill_id, current_forward_id, limit):
|
current_backfill_id, current_forward_id, limit):
|
||||||
"""Get all the new events that have arrived at the server either as
|
"""Get all the new events that have arrived at the server either as
|
||||||
|
@ -1612,15 +1610,6 @@ class EventsStore(SQLBaseStore):
|
||||||
else:
|
else:
|
||||||
upper_bound = current_forward_id
|
upper_bound = current_forward_id
|
||||||
|
|
||||||
sql = (
|
|
||||||
"SELECT event_stream_ordering FROM current_state_resets"
|
|
||||||
" WHERE ? < event_stream_ordering"
|
|
||||||
" AND event_stream_ordering <= ?"
|
|
||||||
" ORDER BY event_stream_ordering ASC"
|
|
||||||
)
|
|
||||||
txn.execute(sql, (last_forward_id, upper_bound))
|
|
||||||
state_resets = txn.fetchall()
|
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT event_stream_ordering, event_id, state_group"
|
"SELECT event_stream_ordering, event_id, state_group"
|
||||||
" FROM ex_outlier_stream"
|
" FROM ex_outlier_stream"
|
||||||
|
@ -1632,7 +1621,6 @@ class EventsStore(SQLBaseStore):
|
||||||
forward_ex_outliers = txn.fetchall()
|
forward_ex_outliers = txn.fetchall()
|
||||||
else:
|
else:
|
||||||
new_forward_events = []
|
new_forward_events = []
|
||||||
state_resets = []
|
|
||||||
forward_ex_outliers = []
|
forward_ex_outliers = []
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
|
@ -1672,7 +1660,6 @@ class EventsStore(SQLBaseStore):
|
||||||
return AllNewEventsResult(
|
return AllNewEventsResult(
|
||||||
new_forward_events, new_backfill_events,
|
new_forward_events, new_backfill_events,
|
||||||
forward_ex_outliers, backward_ex_outliers,
|
forward_ex_outliers, backward_ex_outliers,
|
||||||
state_resets,
|
|
||||||
)
|
)
|
||||||
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|
||||||
|
|
||||||
|
@ -1898,5 +1885,4 @@ class EventsStore(SQLBaseStore):
|
||||||
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
||||||
"new_forward_events", "new_backfill_events",
|
"new_forward_events", "new_backfill_events",
|
||||||
"forward_ex_outliers", "backward_ex_outliers",
|
"forward_ex_outliers", "backward_ex_outliers",
|
||||||
"state_resets"
|
|
||||||
])
|
])
|
||||||
|
|
|
@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
|
|
||||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._membership_stream_cache.entity_has_changed,
|
self._membership_stream_cache.entity_has_changed,
|
||||||
event.state_key, event.internal_metadata.stream_ordering
|
event.state_key, event.internal_metadata.stream_ordering
|
||||||
|
@ -220,7 +218,7 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
" ON e.event_id = c.event_id"
|
" ON e.event_id = c.event_id"
|
||||||
" AND m.room_id = c.room_id"
|
" AND m.room_id = c.room_id"
|
||||||
" AND m.user_id = c.state_key"
|
" AND m.user_id = c.state_key"
|
||||||
" WHERE %s"
|
" WHERE c.type = 'm.room.member' AND %s"
|
||||||
) % (where_clause,)
|
) % (where_clause,)
|
||||||
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
|
@ -266,7 +264,7 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
" ON m.event_id = c.event_id "
|
" ON m.event_id = c.event_id "
|
||||||
" AND m.room_id = c.room_id "
|
" AND m.room_id = c.room_id "
|
||||||
" AND m.user_id = c.state_key"
|
" AND m.user_id = c.state_key"
|
||||||
" WHERE %(where)s"
|
" WHERE c.type = 'm.room.member' AND %(where)s"
|
||||||
) % {
|
) % {
|
||||||
"where": where_clause,
|
"where": where_clause,
|
||||||
}
|
}
|
||||||
|
|
17
synapse/storage/schema/delta/40/current_state_idx.sql
Normal file
17
synapse/storage/schema/delta/40/current_state_idx.sql
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
/* Copyright 2017 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('current_state_members_idx', '{}');
|
|
@ -49,6 +49,7 @@ class StateStore(SQLBaseStore):
|
||||||
|
|
||||||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||||
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
||||||
|
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(StateStore, self).__init__(hs)
|
super(StateStore, self).__init__(hs)
|
||||||
|
@ -60,6 +61,13 @@ class StateStore(SQLBaseStore):
|
||||||
self.STATE_GROUP_INDEX_UPDATE_NAME,
|
self.STATE_GROUP_INDEX_UPDATE_NAME,
|
||||||
self._background_index_state,
|
self._background_index_state,
|
||||||
)
|
)
|
||||||
|
self.register_background_index_update(
|
||||||
|
self.CURRENT_STATE_INDEX_UPDATE_NAME,
|
||||||
|
index_name="current_state_events_member_index",
|
||||||
|
table="current_state_events",
|
||||||
|
columns=["state_key"],
|
||||||
|
where_clause="type='m.room.member'",
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_groups_ids(self, room_id, event_ids):
|
def get_state_groups_ids(self, room_id, event_ids):
|
||||||
|
|
|
@ -88,7 +88,7 @@ class RetryDestinationLimiter(object):
|
||||||
def __init__(self, destination, clock, store, retry_interval,
|
def __init__(self, destination, clock, store, retry_interval,
|
||||||
min_retry_interval=10 * 60 * 1000,
|
min_retry_interval=10 * 60 * 1000,
|
||||||
max_retry_interval=24 * 60 * 60 * 1000,
|
max_retry_interval=24 * 60 * 60 * 1000,
|
||||||
multiplier_retry_interval=5,):
|
multiplier_retry_interval=5, backoff_on_404=False):
|
||||||
"""Marks the destination as "down" if an exception is thrown in the
|
"""Marks the destination as "down" if an exception is thrown in the
|
||||||
context, except for CodeMessageException with code < 500.
|
context, except for CodeMessageException with code < 500.
|
||||||
|
|
||||||
|
@ -107,6 +107,7 @@ class RetryDestinationLimiter(object):
|
||||||
a failed request, in milliseconds.
|
a failed request, in milliseconds.
|
||||||
multiplier_retry_interval (int): The multiplier to use to increase
|
multiplier_retry_interval (int): The multiplier to use to increase
|
||||||
the retry interval after a failed request.
|
the retry interval after a failed request.
|
||||||
|
backoff_on_404 (bool): Back off if we get a 404
|
||||||
"""
|
"""
|
||||||
self.clock = clock
|
self.clock = clock
|
||||||
self.store = store
|
self.store = store
|
||||||
|
@ -116,6 +117,7 @@ class RetryDestinationLimiter(object):
|
||||||
self.min_retry_interval = min_retry_interval
|
self.min_retry_interval = min_retry_interval
|
||||||
self.max_retry_interval = max_retry_interval
|
self.max_retry_interval = max_retry_interval
|
||||||
self.multiplier_retry_interval = multiplier_retry_interval
|
self.multiplier_retry_interval = multiplier_retry_interval
|
||||||
|
self.backoff_on_404 = backoff_on_404
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
pass
|
pass
|
||||||
|
@ -123,7 +125,20 @@ class RetryDestinationLimiter(object):
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
valid_err_code = False
|
valid_err_code = False
|
||||||
if exc_type is not None and issubclass(exc_type, CodeMessageException):
|
if exc_type is not None and issubclass(exc_type, CodeMessageException):
|
||||||
valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500
|
# Some error codes are perfectly fine for some APIs, whereas other
|
||||||
|
# APIs may expect to never received e.g. a 404. It's important to
|
||||||
|
# handle 404 as some remote servers will return a 404 when the HS
|
||||||
|
# has been decommissioned.
|
||||||
|
if exc_val.code == 404 and self.backoff_on_404:
|
||||||
|
valid_err_code = False
|
||||||
|
elif exc_val.code == 429:
|
||||||
|
# 429 is us being aggresively rate limited, so lets rate limit
|
||||||
|
# ourselves.
|
||||||
|
valid_err_code = False
|
||||||
|
elif exc_val.code < 500:
|
||||||
|
valid_err_code = True
|
||||||
|
else:
|
||||||
|
valid_err_code = False
|
||||||
|
|
||||||
if exc_type is None or valid_err_code:
|
if exc_type is None or valid_err_code:
|
||||||
# We connected successfully.
|
# We connected successfully.
|
||||||
|
|
|
@ -58,49 +58,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
[unpatch() for unpatch in self.unpatches]
|
[unpatch() for unpatch in self.unpatches]
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_room_members(self):
|
|
||||||
yield self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
||||||
yield self.replicate()
|
|
||||||
yield self.check("get_rooms_for_user", (USER_ID,), [])
|
|
||||||
yield self.check("get_users_in_room", (ROOM_ID,), [])
|
|
||||||
|
|
||||||
# Join the room.
|
|
||||||
join = yield self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
||||||
yield self.replicate()
|
|
||||||
yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser(
|
|
||||||
room_id=ROOM_ID,
|
|
||||||
sender=USER_ID,
|
|
||||||
membership="join",
|
|
||||||
event_id=join.event_id,
|
|
||||||
stream_ordering=join.internal_metadata.stream_ordering,
|
|
||||||
)])
|
|
||||||
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID])
|
|
||||||
|
|
||||||
# Leave the room.
|
|
||||||
yield self.persist(type="m.room.member", key=USER_ID, membership="leave")
|
|
||||||
yield self.replicate()
|
|
||||||
yield self.check("get_rooms_for_user", (USER_ID,), [])
|
|
||||||
yield self.check("get_users_in_room", (ROOM_ID,), [])
|
|
||||||
|
|
||||||
# Add some other user to the room.
|
|
||||||
join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join")
|
|
||||||
yield self.replicate()
|
|
||||||
yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser(
|
|
||||||
room_id=ROOM_ID,
|
|
||||||
sender=USER_ID,
|
|
||||||
membership="join",
|
|
||||||
event_id=join.event_id,
|
|
||||||
stream_ordering=join.internal_metadata.stream_ordering,
|
|
||||||
)])
|
|
||||||
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2])
|
|
||||||
|
|
||||||
yield self.persist(
|
|
||||||
type="m.room.member", key=USER_ID, membership="join",
|
|
||||||
)
|
|
||||||
yield self.replicate()
|
|
||||||
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2, USER_ID])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_get_latest_event_ids_in_room(self):
|
def test_get_latest_event_ids_in_room(self):
|
||||||
create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
|
create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
|
||||||
|
|
Loading…
Reference in a new issue